2010-07-25 35 views
23

Tôi đã chắc chắn có một cái gì đó như thế này trong thư viện chuẩn, nhưng có vẻ như tôi đã sai.Python: Một cái gì đó giống như `bản đồ` hoạt động trên các chủ đề

Tôi có một loạt các url mà tôi muốn urlopen song song. Tôi muốn một cái gì đó giống như các chức năng được xây dựng trong map, ngoại trừ công việc được thực hiện song song bởi một loạt các chủ đề.

Có mô-đun tốt thực hiện việc này không?

+1

Bạn có nghĩa là bạn muốn bản đồ để bắt đầu chủ đề (như câu trả lời pillmuncher của, bản đồ (urlopen, url)), hoặc bạn sẽ tự khởi động các chủ đề urlopening, và muốn một cái gì đó giống như bản đồ để hành động trên các kết quả thực hiện mỗi thread, khi họ trở nên có sẵn? – rbp

Trả lời

10

Có người đề nghị tôi sử dụng gói futures cho việc này. Tôi đã thử nó và nó có vẻ là làm việc.

http://pypi.python.org/pypi/futures

Dưới đây là một ví dụ:

"Download many URLs in parallel." 

import functools 
import urllib.request 
import futures 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

with futures.ThreadPoolExecutor(50) as executor: 
    future_list = executor.run_to_futures(
      [functools.partial(load_url, url, 30) for url in URLS]) 
0

tôi muốn quấn nó lên trong một hàm (chưa được kiểm tra):

import itertools 
import threading 
import urllib2 
import Queue 

def openurl(url, queue): 
    def starter(): 
     try: 
      result = urllib2.urlopen(url) 
     except Ecxeption, exc: 
      def raiser(): 
       raise exc 
      queue.put((url, raiser)) 
     else: 
      queue.put((url, lambda:result)) 
    threadind.Thread(target=starter).start() 

myurls = ... # the list of urls 
myqueue = Queue.Queue() 

map(openurl, myurls, itertools.repeat(myqueue)) 

for each in myurls: 
    url, getresult = queue.get() 
    try: 
     result = getresult() 
    except Exception, exc: 
     print 'exception raised:' + str(exc) 
    else: 
     # do stuff with result 
34

Có một phương pháp map trong multiprocessing.Pool. Điều đó có nhiều quy trình.

Và nếu nhiều quá trình không phải là món ăn của bạn, bạn có thể sử dụng multiprocessing.dummy sử dụng chủ đề.

import urllib 
import multiprocessing.dummy 

p = multiprocessing.dummy.Pool(5) 
def f(post): 
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post) 

print p.map(f, range(3329361, 3329361 + 5)) 
+0

Điều này là tuyệt vời, nhưng nó sẽ không hoạt động trên python2.6 nếu chạy từ một luồng vì lỗi này: http://bugs.python.org/issue14881 – gregsabo

+0

Hoạt động tốt trong python 2.79 - hiện là phiên bản mới nhất của 2x, và khá tốt ở đó! – FredTheWebGuy

1

Đây là triển khai thực hiện của tôi về bản đồ ren:

from threading import Thread 
from queue import Queue 

def thread_map(f, iterable, pool=None): 
    """ 
    Just like [f(x) for x in iterable] but each f(x) in a separate thread. 
    :param f: f 
    :param iterable: iterable 
    :param pool: thread pool, infinite by default 
    :return: list if results 
    """ 
    res = {} 
    if pool is None: 
     def target(arg, num): 
      try: 
       res[num] = f(arg) 
      except: 
       res[num] = sys.exc_info() 

     threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)] 
    else: 
     class WorkerThread(Thread): 
      def run(self): 
       while True: 
        try: 
         num, arg = queue.get(block=False) 
         try: 
          res[num] = f(arg) 
         except: 
          res[num] = sys.exc_info() 
        except Empty: 
         break 

     queue = Queue() 
     for i, arg in enumerate(iterable): 
      queue.put((i, arg)) 

     threads = [WorkerThread() for _ in range(pool)] 

    [t.start() for t in threads] 
    [t.join() for t in threads] 
    return [res[i] for i in range(len(res))] 
+0

Bạn cần nhập 'Empty' trên dòng thứ hai. – speedplane

Các vấn đề liên quan