2012-05-19 36 views
8

Đây là phần mở rộng của câu hỏi gần đây của tôi Avoiding race conditions in Python 3's multiprocessing Queues. Hy vọng rằng phiên bản này của câu hỏi cụ thể hơn.Đa xử lý hiệu quả lực tối đa lớn, brute trong Python 3

TL; DR: Trong mô hình xử lý đa nơi quy trình của nhân viên được cấp từ hàng đợi bằng cách sử dụng multiprocessing.Queue, tại sao nhân viên của tôi xử lý quá nhàn rỗi? Mỗi quá trình có hàng đợi đầu vào của riêng chúng để chúng không đánh nhau với nhau cho khóa của hàng đợi được chia sẻ, nhưng hàng đợi dành nhiều thời gian thực sự trống. Quá trình chính đang chạy một chuỗi I/O-ràng buộc - là làm chậm việc lấp đầy CPU của hàng đợi đầu vào?

Tôi đang cố gắng tìm phần tử tối đa của sản phẩm Descartes của N đặt từng phần tử M_i (cho 0 < = i < N) theo một ràng buộc nhất định. Nhớ lại rằng các phần tử của sản phẩm Descartes là các bộ N có các phần tử của các bộ N. Tôi sẽ gọi những kết hợp của bộ dữ liệu này để nhấn mạnh sự thật rằng tôi đang lặp lại mọi sự kết hợp của các bộ gốc. Kết hợp đáp ứng ràng buộc khi chức năng của tôi is_feasible trả lại True. Trong vấn đề của tôi, tôi đang cố gắng để tìm sự kết hợp có yếu tố có trọng lượng lớn nhất: sum(element.weight for element in combination).

Kích thước vấn đề của tôi lớn, nhưng máy chủ của công ty cũng vậy. Tôi đang cố gắng viết lại thuật toán nối tiếp sau đây như một thuật toán song song.

from operator import itemgetter 
from itertools import product # Cartesian product function from the std lib 
def optimize(sets): 
    """Return the largest (total-weight, combination) tuple from all 
    possible combinations of the elements in the several sets, subject 
    to the constraint that is_feasible(combo) returns True.""" 
    return max(
       map(
        lambda combination: (
         sum(element.weight for element in combination), 
         combination 
        ), 
        filter(
         is_feasible, # Returns True if combo meets constraint 
         product(*sets) 
        ) 
       ), 
       key=itemgetter(0) # Only maximize based on sum of weight 
      ) 

Cách tiếp cận đa xử lý hiện tại của tôi là tạo quy trình công nhân và kết hợp chúng với hàng nhập liệu. Khi người lao động nhận được poison pill họ đặt kết hợp tốt nhất mà họ đã thấy trên hàng đợi đầu ra và thoát. Tôi điền vào hàng đợi đầu vào từ luồng chính của quá trình chính. Một ưu điểm của kỹ thuật này là tôi có thể sinh ra một luồng thứ cấp từ tiến trình chính để chạy một công cụ giám sát (chỉ là một REPL mà tôi có thể sử dụng để xem có bao nhiêu kết hợp đã được xử lý cho đến nay và hàng đợi đầy đủ).

    +-----------+ 
      in_q0 | worker0 |----\ 
      /-------+-----------+  \ 
+-----------+ in_q1 +-----------+ \ out_q +-----------+ 
| main |-----------| worker1 |-----------| main | 
+-----------+   +-----------+/  +-----------+ 
      \-------+-----------+ /
      in_q2 | worker2 |----/ 
        +-----------+ 

Ban đầu tôi có tất cả công nhân đọc từ một hàng đợi đầu vào nhưng thấy rằng không ai trong số họ đã truy cập CPU. Nhận thấy rằng họ đã dành tất cả thời gian của họ chờ đợi queue.get() để mở khóa, tôi đã cho họ hàng đợi của mình. Đó là áp lực gia tăng trên CPU, vì vậy tôi đã tìm ra các công nhân hoạt động thường xuyên hơn. Tuy nhiên, hàng đợi dành phần lớn thời gian của họ trống rỗng! (Tôi biết điều này từ REPL theo dõi tôi đã đề cập). Điều này cho thấy rằng vòng lặp chính lấp đầy hàng đợi là chậm. Đây là vòng lặp đó:

from itertools import cycle 
main(): 
    # (Create workers, each with its own input queue) 
    # Cycle through each worker's queue and add a combination to that queue 
    for combo, worker in zip(product(*sets), cycle(workers)): 
     worker.in_q.put(combo) 
    # (Collect results and return) 

Tôi đoán nút cổ chai là worker.in_q.put(). Làm thế nào để làm cho nhanh hơn? Bản năng đầu tiên của tôi là làm cho người lao động chậm hơn, nhưng điều đó không có ý nghĩa ... Có phải vấn đề mà sợi màn hình đang dừng vòng lặp quá thường xuyên không? Làm thế nào tôi có thể nói?

Ngoài ra, có cách nào khác để thực hiện điều này mà không liên quan đến quá nhiều chờ đợi trên ổ khóa không?

Trả lời

4

Các yếu tố của bạn trông như thế nào? Nó có thể là tẩy họ để đặt chúng trong hàng đợi là chậm, mà rõ ràng sẽ là một nút cổ chai. Lưu ý rằng mỗi phần tử đang được chọn độc lập hơn và hơn và hơn nữa.

Nếu đây là trường hợp, phương pháp này có thể giúp:

  • Chọn một bộ với cardinality> = số điện thoại của người lao động. Lý tưởng nhất, nó sẽ nhiều hơn số lượng công nhân.Gọi bộ này là A và gán các tập con A bằng nhau cho mỗi công nhân. Truyền tập hợp con đó cho từng công nhân.
  • Phân phối toàn bộ nội dung của tất cả các bộ khác nhau cho mỗi công nhân (có thể thông qua pickle.dumps một lần và sau đó truyền cùng một chuỗi cho mỗi công nhân hoặc có thể thông qua bộ nhớ dùng chung hoặc bất kỳ thứ gì khác).
  • Sau đó, mỗi nhân viên có toàn bộ thông tin cần thiết để thực hiện tập con của nó. Nó có thể bắt đầu trên con đường vui vẻ của nó trên product(my_A_subset, *other_sets) (có thể được đặt hàng khác nhau), bỏ phiếu cho một số loại tín hiệu dừng giữa mỗi công việc (hoặc mỗi ba công việc hoặc bất cứ điều gì). Điều này không cần phải thông qua hàng đợi, giá trị bộ nhớ chia sẻ một bit hoạt động tốt.
+0

Tôi _think_ Tôi thấy những gì bạn đang nói và nếu có yêu cầu nhiều lần viết lại. Bạn chắc chắn có một điểm về việc 'dump'ing' và 'load'ing liên tục của các phần tử (đó là các cá thể của một lớp con của' collections.Counter'). Nó sẽ giúp ích nếu tôi _pre_-pickled chúng? Ví dụ, trước vòng lặp 'for' trong' main', có một dòng như 'sets_of_pickles = map (lambda s: map (dumps, s), sets)' – wkschwartz

+1

Điều đó có lẽ sẽ giúp phần nào: nó sẽ tiết kiệm chi phí tẩy , nhưng bạn vẫn sẽ truyền các chuỗi bị cắt qua IPC hơn và hơn thay vì chỉ một lần cho mỗi công nhân. – Dougal

Các vấn đề liên quan