2012-05-15 41 views
10

Tôi đang cố gắng tìm trọng lượng tối đa khoảng 6,1 tỷ (mục tùy chỉnh) và tôi muốn thực hiện việc này với xử lý song song. Đối với ứng dụng cụ thể của tôi có những thuật toán tốt hơn mà không yêu cầu của tôi iterating hơn 6.1 tỷ mục, nhưng sách giáo khoa giải thích họ là trên đầu của tôi và ông chủ của tôi muốn điều này được thực hiện trong 4 ngày. Tôi nghĩ rằng tôi có một shot tốt hơn với máy chủ ưa thích của công ty tôi và xử lý song song. Tuy nhiên, mọi thứ tôi biết về xử lý song song đều đến từ việc đọc số Pythondocumentation. Mà là để nói rằng tôi khá bị mất ...Tránh điều kiện cuộc đua trong Hàng đợi đa xử lý của Python 3

Lý thuyết hiện tại của tôi là thiết lập quy trình nạp, hàng đợi đầu vào, toàn bộ bó (ví dụ, 30) quy trình công nhân và hàng đợi đầu ra (tìm phần tử tối đa trong hàng đợi đầu ra sẽ không quan trọng). Những gì tôi không hiểu là làm thế nào quá trình feeder có thể nói với các quy trình công nhân khi dừng đợi các vật phẩm đi qua hàng đợi đầu vào.

Tôi đã nghĩ về việc sử dụng multiprocessing.Pool.map_async trên các mục 6.1E9 có thể lặp lại của tôi, nhưng phải mất gần 10 phút để lặp qua các mục mà không làm bất cứ điều gì với chúng. Trừ khi tôi hiểu nhầm điều gì đó ..., có map_async lặp qua chúng để gán chúng cho các quy trình có thể được thực hiện trong khi các quy trình bắt đầu công việc của chúng. (Pool cũng cung cấp imap nhưng documentation nói nó tương tự như map, mà không xuất hiện để làm việc không đồng bộ Tôi muốn đồng bộ, đúng.?)

câu hỏi liên quan: Tôi muốn sử dụng concurrent.futures thay vì multiprocessing ? Tôi không thể là người đầu tiên thực hiện một hệ thống hai hàng đợi (đó chính xác là cách mà các hãng hàng không ở Mỹ làm việc ...) vậy thì có cách nào khác để tạo ra điều này cho Pythonic/built-in?

Đây là bộ xương của những gì tôi đang cố gắng làm. Xem khối nhận xét ở giữa.

import multiprocessing as mp 
import queue 

def faucet(items, bathtub): 
    """Fill bathtub, a process-safe queue, with 6.1e9 items""" 
    for item in items: 
     bathtub.put(item) 
    bathtub.close() 

def drain_filter(bathtub, drain): 
    """Put maximal item from bathtub into drain. 
    Bathtub and drain are process-safe queues. 
    """ 
    max_weight = 0 
    max_item = None 
    while True: 
     try: 
      current_item = bathtub.get() 
     # The following line three lines are the ones that I can't 
     # quite figure out how to trigger without a race condition. 
     # What I would love is to trigger them AFTER faucet calls 
     # bathtub.close and the bathtub queue is empty. 
     except queue.Empty: 
      drain.put((max_weight, max_item)) 
      return 
     else: 
      bathtub.task_done() 
     if not item.is_relevant(): 
      continue 
     current_weight = item.weight 
     if current_weight > max_weight: 
      max_weight = current_weight 
      max_item = current_item 

def parallel_max(items, nprocs=30): 
    """The elements of items should have a method `is_relevant` 
    and an attribute `weight`. `items` itself is an immutable 
    iterator object. 
    """ 
    bathtub_q = mp.JoinableQueue() 
    drain_q = mp.Queue() 

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q)) 
    worker_procs = mp.Pool(processes=nprocs) 

    faucet_proc.start() 
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q) 

    finalists = [] 
    for i in range(nprocs): 
     finalists.append(drain_q.get()) 

    return max(finalists) 


Đây là Trả lời

Tôi tìm thấy một câu trả lời rất kỹ lưỡng để câu hỏi của tôi, và một giới thiệu nhẹ nhàng để đa nhiệm của đạo diễn truyền Python Foundation Doug Hellman. Những gì tôi muốn là mẫu "thuốc độc". Hãy khám phá tại đây: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Đạo cụ để @MRAB đăng hạt nhân của khái niệm đó.

+0

Tại sao bạn 'nhập hàng đợi' nếu bạn đang sử dụng' multiprocessing.Queue'? –

+0

Tôi sử dụng nó để bắt một ngoại lệ 'queue.Empty' khi nhân viên nhìn vào hàng đợi đầu vào của anh ta. Suy nghĩ mơ hồ của tôi là ngoại lệ đó có thể bị ném nếu và chỉ khi hàng đợi đã bị đóng và nó cũng trống rỗng. Lưu ý rằng theo phương pháp @ MRAB trong câu trả lời của mình, việc nhập 'hàng đợi' sẽ không cần thiết. – wkschwartz

Trả lời

3

Bạn có thể đặt một mục chấm dứt đặc biệt, chẳng hạn như Không, vào hàng đợi. Khi một nhân viên nhìn thấy nó, nó có thể đặt nó trở lại cho các công nhân khác để xem, và sau đó chấm dứt. Ngoài ra, bạn có thể đặt một mục chấm dứt đặc biệt cho mỗi công nhân vào hàng đợi.

+1

Đó là một câu trả lời thực sự tốt. Tôi sẽ để câu hỏi này mở lâu hơn một chút trong trường hợp bất cứ ai cũng có thể trả lời các câu hỏi liên quan. – wkschwartz

+0

Vì vậy, tôi đặt 'bathtub.put (None)' vào cuối 'faucet()' trước khi đóng hàng đợi, nhưng tôi phải làm gì với các cuộc gọi 'task_done()'?Hay tôi chỉ cần chuyển từ sử dụng 'JoinableQueue' thành' Queue' thông thường và loại bỏ tất cả các lệnh 'task_done()'? – wkschwartz

Các vấn đề liên quan