2010-04-01 30 views
12

Tôi có một chức năng có tác dụng phụ miễn phí. Tôi muốn chạy nó cho mọi phần tử trong một mảng và trả về một mảng với tất cả các kết quả.Có một hàm map() đa luồng không?

Python có gì để tạo tất cả các giá trị không?

+0

bạn có nghĩa là bản đồ như trong bản đồ-giảm? bạn có thể đưa ra ví dụ về đầu vào và hàng đầu ra không? – Simon

+0

Không, tôi không có nghĩa là giảm bản đồ. Vì tôi muốn tất cả dữ liệu từ mỗi hàm riêng lẻ đều được trả về. Chỉ là mỗi giá trị có thể được tính độc lập với nhau. Mặc dù, vì tôi nghĩ rằng tôi muốn tối đa tập hợp, có lẽ tôi có thể sử dụng bản đồ giảm ở đây ... – Sandro

+0

map() sẽ làm những gì bạn nói, vận hành trên từng phần tử độc lập (với lời nhắc GIL được đề cập bên dưới) –

Trả lời

16

Hãy thử chức năng Pool.map từ đa:

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

Nó không đa luồng cho mỗi gia nhập, nhưng đó là thực sự tốt vì đa luồng là tê liệt nghiêm trọng bằng Python của GIL.

+0

Rất tuyệt, dòng cuối cùng đã bán cho tôi. Tôi đã thấy thư viện đa xử lý này trước đây nhưng tôi nghĩ rằng nó quá nặng đối với nhu cầu của tôi. Tôi nghĩ rằng tôi thấy ánh sáng bây giờ :) Cảm ơn bạn. – Sandro

+0

Câu trả lời này có cập nhật/vẫn liên quan đến đa luồng/GIL không? – Moberg

+0

Có, đối với một số giá trị "nghiêm trọng."Tôi nghĩ rằng đó là vấn đề của ý kiến ​​xấu như thế nào, nhưng tôi vẫn thích nhiều quy trình trên Linux, nơi các quá trình không quá nặng. – samtregar

-1

Tôi nghĩ sẽ không có lý do gì để có một chức năng như vậy. Tất cả các luồng Python phải thực thi trên cùng một CPU. Giả sử chức năng bản đồ của bạn không có thành phần I/O, bạn sẽ không thấy bất kỳ tăng tốc nào trong quá trình xử lý (và có thể sẽ thấy sự chậm lại do chuyển đổi ngữ cảnh).

Các áp phích khác đã đề cập đa xử lý - đó có thể là ý tưởng hay hơn.

+0

Vì vậy, là python đa luồng thực sự là xấu? Bạn có bất kỳ nguồn nào trên thông tin này? – Sandro

+1

Nó là không phải là một câu hỏi "xấu" và "tốt" - tôi đã đề cập đến một cái gì đó cụ thể và cụ thể, đó là trình thông dịch Python sẽ không cho phép các luồng thực thi đồng thời trên nhiều bộ vi xử lý. tại một thời điểm, là các hoạt động CPU từ các luồng khác nhau phải được xen kẽ trên một bộ xử lý đơn lẻ. thực sự sẽ nhanh hơn, như trong trường hợp đầu tiên có – danben

+0

sẽ chỉ là một công tắc ngữ cảnh đơn lẻ) – danben

0

Có thể thử triển khai Unladen Swallow Python 3? Đó có thể là một dự án lớn, và không được đảm bảo ổn định, nhưng nếu bạn nghiêng nó có thể hoạt động. Sau đó, list or set comprehensions có vẻ như cấu trúc chức năng thích hợp để sử dụng.

0

Dưới đây là chức năng map_parallel tôi. Nó hoạt động giống như map, ngoại trừ nó có thể chạy từng phần tử song song trong một luồng riêng biệt (nhưng hãy xem lưu ý bên dưới). Câu trả lời này xây dựng theo số another SO answer.

import threading 
import logging 
def map_parallel(f, iter, max_parallel = 10): 
    """Just like map(f, iter) but each is done in a separate thread.""" 
    # Put all of the items in the queue, keep track of order. 
    from queue import Queue, Empty 
    total_items = 0 
    queue = Queue() 
    for i, arg in enumerate(iter): 
     queue.put((i, arg)) 
     total_items += 1 
    # No point in creating more thread objects than necessary. 
    if max_parallel > total_items: 
     max_parallel = total_items 

    # The worker thread. 
    res = {} 
    errors = {} 
    class Worker(threading.Thread): 
     def run(self): 
      while not errors: 
       try: 
        num, arg = queue.get(block = False) 
        try: 
         res[num] = f(arg) 
        except Exception as e: 
         errors[num] = sys.exc_info() 
       except Empty: 
        break 

    # Create the threads. 
    threads = [Worker() for _ in range(max_parallel)] 
    # Start the threads. 
    [t.start() for t in threads] 
    # Wait for the threads to finish. 
    [t.join() for t in threads] 

    if errors: 
     if len(errors) > 1: 
      logging.warning("map_parallel multiple errors: %d:\n%s"%(
       len(errors), errors)) 
     # Just raise the first one. 
     item_i = min(errors.keys()) 
     type, value, tb = errors[item_i] 
     # Print the original traceback 
     logging.info("map_parallel exception on item %s/%s:\n%s"%(
      item_i, total_items, "\n".join(traceback.format_tb(tb)))) 
     raise value 
    return [res[i] for i in range(len(res))] 

LƯU Ý: Một điều cần lưu ý là Ngoại lệ. Giống như bình thường map, hàm trên làm tăng một ngoại lệ nếu một trong số đó là tiểu đề xuất ra một ngoại lệ và sẽ dừng lặp lại. Tuy nhiên, do tính chất song song, không có gì đảm bảo rằng yếu tố đầu tiên sẽ làm tăng ngoại lệ đầu tiên.