2017-08-30 23 views
11

Im trong quá trình di chuyển một số mã đồng bộ sang asyncio bằng aiohttp. mã đồng bộ mất 15 phút để chạy, vì vậy tôi hy vọng sẽ cải thiện điều này.Python aiohttp/asyncio - cách xử lý dữ liệu trả về

Tôi có một số mã làm việc lấy dữ liệu từ một số url và trả về phần thân của mỗi url. Nhưng điều này chỉ chống lại 1 trang web trong phòng thí nghiệm, tôi có hơn 70 trang web thực tế.

Vì vậy, nếu tôi có vòng lặp để tạo danh sách tất cả các url cho tất cả các trang web sẽ tạo 700 url trong danh sách cần xử lý. Bây giờ chế biến chúng tôi không nghĩ là một vấn đề?

Nhưng làm 'công cụ' với kết quả, tôi không chắc chắn cách lập trình? Tôi có mã đã được rằng sẽ làm 'công cụ' cho mỗi kết quả được trả lại, nhưng tôi không chắc chắn làm thế nào để chương trình chống lại loại kết quả phù hợp.

Khi mã chạy nó xử lý tất cả các url và tùy thuộc vào thời gian chạy, trả lại một thứ tự không xác định?

Tôi có cần chức năng xử lý bất kỳ loại kết quả nào không?

import asyncio, aiohttp, ssl 
from bs4 import BeautifulSoup 

def page_content(page): 
    return BeautifulSoup(page, 'html.parser') 


async def fetch(session, url): 
    with aiohttp.Timeout(15, loop=session.loop): 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    tasks = [] 
    # Fetch all responses within one Client session, 
    # keep connection alive for all requests. 
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session: 
     for i in urls: 
      task = asyncio.ensure_future(fetch(session, i)) 
      tasks.append(task) 

     responses = await asyncio.gather(*tasks) 
     # you now have all response bodies in this variable 
     for i in responses: 
      print(i.title.text) 
     return responses 


def main(): 
    username = 'monitoring' 
    password = '*********' 
    ip = '10.10.10.2' 
    urls = [ 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'), 
     ] 
    loop = asyncio.get_event_loop() 
    future = asyncio.ensure_future(get_url_data(urls,username,password)) 
    data = loop.run_until_complete(future) 
    print(data) 

if __name__ == "__main__": 
    main() 

Trả lời

2

Mã của bạn không xa nhãn hiệu. asyncio.gather trả lại kết quả theo thứ tự của các đối số, vì vậy thứ tự được giữ nguyên ở đây, nhưng page_content sẽ không được gọi theo thứ tự.

Một vài điều chỉnh:

Trước hết, bạn không cần phải ensure_future đây. Việc tạo một Task chỉ là cần thiết nếu bạn đang cố gắng để có một coroutine sống lâu hơn cha mẹ của nó, nghĩa là nếu nhiệm vụ phải tiếp tục chạy mặc dù chức năng tạo ra nó được thực hiện. Ở đây những gì bạn cần là thay vì gọi asyncio.gather trực tiếp với coroutines của bạn:

async def get_url_data(urls, username, password): 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(fetch(session, i) for i in urls)) 
    for i in responses: 
     print(i.title.text) 
    return responses 

Nhưng gọi này sẽ sắp xếp tất cả các lấy cùng một lúc, và với một số lượng lớn các URL, đây là xa tối ưu. Thay vào đó, bạn nên chọn đồng thời tối đa và đảm bảo rằng hầu hết các lần tải X đang chạy bất kỳ lúc nào. Để thực hiện điều này, bạn có thể sử dụng asyncio.Semaphore(20), semaphore này chỉ có thể được mua bởi tối đa 20 coroutines, vì vậy những người khác sẽ chờ đợi để có được cho đến khi có sẵn một điểm.

CONCURRENCY = 20 
TIMEOUT = 15 

async def fetch(session, sem, url): 
    async with sem: 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    sem = asyncio.Semaphore(CONCURRENCY) 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(
      asyncio.wait_for(fetch(session, sem, i), TIMEOUT) 
      for i in urls 
     )) 
    for i in responses: 
     print(i.title.text) 
    return responses 

Bằng cách này, tất cả các lần tải được bắt đầu ngay lập tức, nhưng chỉ 20 trong số đó sẽ có thể có được semaphore. Những người khác sẽ chặn tại hướng dẫn async with đầu tiên và chờ cho đến khi tìm nạp khác được thực hiện.

Tôi cũng đã thay thế aiohttp.Timeout bằng asyncio chính thức tương đương tại đây.

Cuối cùng, để xử lý dữ liệu thực tế, nếu bạn bị giới hạn bởi thời gian CPU, asyncio có thể sẽ không giúp bạn nhiều. Bạn sẽ cần phải sử dụng một ProcessPoolExecutor ở đây để song song công việc thực tế với một CPU khác. run_in_executor có thể sẽ được sử dụng.

+0

Cảm ơn, tôi hiểu tất cả những gì bạn đã nói nhưng bạn đã mất tôi ở phần ProcessPoolExecutor. Tôi cần phải có một quá trình CPU riêng biệt kết quả? Làm thế nào để tôi làm điều này? và làm thế nào để xử lý chúng theo thứ tự, hoặc tôi cần một chức năng xử lý tất cả các kết quả không có vấn đề gì loại? – AlexW

2

Dưới đây là ví dụ với concurrent.futures.ProcessPoolExecutor. Nếu được tạo mà không chỉ định max_workers, việc triển khai sẽ sử dụng os.cpu_count thay thế. Cũng lưu ý rằng asyncio.wrap_future là công khai nhưng không có giấy tờ. Ngoài ra, có AbstractEventLoop.run_in_executor.

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

import aiohttp 
import lxml.html 


def process_page(html): 
    '''Meant for CPU-bound workload''' 
    tree = lxml.html.fromstring(html) 
    return tree.find('.//title').text 


async def fetch_page(url, session): 
    '''Meant for IO-bound workload''' 
    async with session.get(url, timeout = 15) as res: 
     return await res.text() 


async def process(url, session, pool): 
    html = await fetch_page(url, session) 
    return await asyncio.wrap_future(pool.submit(process_page, html)) 


async def dispatch(urls): 
    pool = ProcessPoolExecutor() 
    async with aiohttp.ClientSession() as session: 
     coros = (process(url, session, pool) for url in urls) 
     return await asyncio.gather(*coros) 


def main(): 
    urls = [ 
     'https://stackoverflow.com/', 
     'https://serverfault.com/', 
     'https://askubuntu.com/', 
     'https://unix.stackexchange.com/' 
    ] 
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls)) 
    print(result) 

if __name__ == '__main__': 
    main() 
Các vấn đề liên quan