2010-09-30 24 views
23

Tôi đang sử dụng python 2.7 và cố gắng chạy một số tác vụ nặng nề của CPU trong các quy trình của riêng họ. Tôi muốn có thể gửi tin nhắn trở lại quá trình cha mẹ để thông báo cho tình trạng hiện tại của quá trình. Hàng đợi đa xử lý có vẻ hoàn hảo cho điều này nhưng tôi không thể tìm ra cách để nó hoạt động.Tôi có thể sử dụng Hàng đợi đa xử lý trong một hàm gọi là Pool.imap không?

Vì vậy, đây là ví dụ làm việc cơ bản của tôi trừ đi việc sử dụng Hàng đợi.

import multiprocessing as mp 
import time 

def f(x): 
    return x*x 

def main(): 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, range(1, 6)) 
    time.sleep(1) 

    print str(results.next()) 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

Tôi đã cố gắng chuyển Hàng đợi bằng nhiều cách và nhận được thông báo lỗi "RuntimeError: Đối tượng xếp hàng chỉ nên được chia sẻ giữa các quá trình thông qua kế thừa". Đây là một trong những cách tôi đã thử dựa trên một câu trả lời trước đó tôi tìm thấy. (Tôi nhận được cùng một vấn đề cố gắng sử dụng Pool.map_async và Pool.imap)

import multiprocessing as mp 
import time 

def f(args): 
    x = args[0] 
    q = args[1] 
    q.put(str(x)) 
    time.sleep(0.1) 
    return x*x 

def main(): 
    q = mp.Queue() 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6))) 

    print str(q.get()) 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

Cuối cùng, cách tiếp cận 0 thể dục (làm cho nó toàn cầu) không tạo ra bất kỳ tin nhắn, nó chỉ bị treo.

import multiprocessing as mp 
import time 

q = mp.Queue() 

def f(x): 
    q.put(str(x)) 
    return x*x 

def main(): 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, range(1, 6)) 
    time.sleep(1) 

    print q.get() 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

Tôi biết rằng nó có thể sẽ làm việc với multiprocessing.Process trực tiếp và rằng có những thư viện khác để thực hiện điều này, nhưng tôi ghét để sao ra khỏi chức năng thư viện tiêu chuẩn mà là rất phù hợp cho đến khi tôi m chắc chắn nó không chỉ là sự thiếu kiến ​​thức của tôi, giữ cho tôi không thể khai thác chúng.

Cảm ơn.

+0

Bạn đã cân nhắc sử dụng bình: http://luispedro.org/software/jug? – luispedro

Trả lời

41

Bí quyết là chuyển Hàng đợi làm đối số cho trình khởi tạo. Xuất hiện để làm việc với tất cả các phương thức gửi Pool.

import multiprocessing as mp 

def f(x): 
    f.q.put('Doing: ' + str(x)) 
    return x*x 

def f_init(q): 
    f.q = q 

def main(): 
    jobs = range(1,6) 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, jobs) 
    p.close() 

    for i in range(len(jobs)): 
     print q.get() 
     print results.next() 

if __name__ == '__main__': 
    main() 
+3

Trình diễn rất tốt về mục đích và tính hữu ích của các đối số 'initializer' và' initargs' đối với 'multiprocessing.Pool'! –

+0

Bạn có thể giải thích, tại sao nó hoạt động? Những gì happpens khi bạn làm f.q = q? – kepkin

+2

@kepkin Trong Python, mọi hàm là một đối tượng (Xem http://docs.python.org/reference/datamodel.html#the-standard-type-hierarchy Các loại có thể gọi). Vì vậy, f.q đang thiết lập một thuộc tính có tên là q trên đối tượng hàm f. Đó chỉ là một cách nhanh chóng và gọn nhẹ để lưu đối tượng Queue để sử dụng sau này. – Olson

Các vấn đề liên quan