2010-07-09 36 views
36

Tôi muốn một quy trình chạy dài để trả lại tiến trình của nó trên một Hàng đợi (hoặc một thứ gì đó tương tự) mà tôi sẽ chuyển đến hộp thoại thanh tiến trình. Tôi cũng cần kết quả khi quá trình hoàn tất. Ví dụ kiểm tra ở đây không thành công với số RuntimeError: Queue objects should only be shared between processes through inheritance.Làm thế nào để bạn chuyển một tham chiếu Queue đến một hàm được quản lý bởi pool.map_async()?

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    q = multiprocessing.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

tôi đã có thể để có được điều này để làm việc sử dụng đối tượng Process cá nhân (nơi tôi đang alowed để vượt qua một tham chiếu Queue) nhưng sau đó tôi không có một hồ bơi để quản lý các quá trình nhiều Tôi muốn để khởi chạy. Bất kỳ tư vấn về một mô hình tốt hơn cho điều này?

+0

Nó không phải là câu trả lời cho câu hỏi của bạn, nhưng hãy thử thư viện 'execnet' để ánh xạ đa tiến trình. Việc xử lý 'multiprocessing' được tích hợp sẵn có một số vấn đề vẫn được sửa (xem bộ theo dõi Python). Bên cạnh đó mã nguồn của nó khá lớn và phức tạp. Thư viện 'execnet' trông tốt hơn tôi nhiều so với' multiprocessing'. –

Trả lời

43

Các mã sau đây dường như làm việc:

import multiprocessing, time 

def task(args): 
    count = args[0] 
    queue = args[1] 
    for i in xrange(count): 
     queue.put("%d mississippi" % i) 
    return "Done" 


def main(): 
    manager = multiprocessing.Manager() 
    q = manager.Queue() 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, [(x, q) for x in range(10)]) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Lưu ý rằng Queue được nhận được từ một manager.Queue() chứ không phải là multiprocessing.Queue(). Cảm ơn Alex đã chỉ cho tôi theo hướng này.

+0

+1 và chỉ cần lưu ý nhanh rằng câu hỏi của bạn đã giúp tôi trong một vấn đề tôi đã có ngày hôm nay. Tôi đã tìm thấy phiên bản Manager của hàng đợi, nhưng mã của tôi không hoạt động vì tôi đã dựa vào toàn cầu. Nó cần phải được thông qua như một tham số, giống như bạn đang làm. – winwaed

+0

Ngoài ra +1 cho 'manager.Queue' - rất hữu ích. – fantabolous

8

Làm qtoàn cầu công trình ...:

import multiprocessing, time 

q = multiprocessing.Queue() 

def task(count): 
    for i in xrange(count): 
     q.put("%d mississippi" % i) 
    return "Done" 

def main(): 
    pool = multiprocessing.Pool() 
    result = pool.map_async(task, range(10)) 
    time.sleep(1) 
    while not q.empty(): 
     print q.get() 
    print result.get() 

if __name__ == "__main__": 
    main() 

Nếu bạn cần nhiều hàng đợi, ví dụ để tránh trộn lẫn tiến độ của các quy trình nhóm khác nhau, danh sách hàng đợi toàn cầu sẽ hoạt động (tất nhiên, mỗi quá trình sẽ cần biết chỉ số chỉ số trong danh sách để sử dụng, nhưng đó là OK để vượt qua đối số;).

+0

Tác vụ này có hoạt động nếu "tác vụ" được định nghĩa trong một mô-đun hoặc gói khác không? Mã ví dụ rất đơn giản. Chương trình thực sự có một kiến ​​trúc MVC nơi một đường ống dẫn người tiêu dùng sản xuất được thiết lập trên nhiều lõi (mô hình) và nó cần gửi các bản cập nhật tiến trình tới GUI wxPython (khung nhìn). – David

+2

@David, bạn có thể thử; nếu mã thực sự của bạn không hoạt động theo cách đơn giản này, bạn sẽ cần phải di chuyển lên một mức độ phức tạp và đi đến một Trình quản lý (có thể cung cấp cho bạn các proxy đến Hàng đợi, v.v.). –

+0

Điều này dường như không hoạt động chút nào. q không bao giờ trả về bất cứ điều gì q.empty() luôn đúng trên máy của tôi. Ngay cả khi tôi tăng cuộc gọi giấc ngủ lên 10 giây, thời gian quá nhiều để công việc đặt một vài thông điệp lên hàng đợi, q.empty luôn trả về True. – David

Các vấn đề liên quan