2012-06-15 43 views
9

Khi chạy một số lượng lớn các tác vụ (với các tham số lớn) bằng cách sử dụng Pool.apply_async, các tiến trình được cấp phát và chuyển sang trạng thái chờ, và không có giới hạn về số lượng quy trình chờ. Điều này có thể kết thúc bằng cách ăn tất cả bộ nhớ, như trong ví dụ bên dưới:Đa xử lý Python: làm cách nào để giới hạn số lượng quy trình chờ?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

Tôi đang tìm cách giới hạn hàng chờ, theo cách chỉ có một số lượng giới hạn chờ đợi, và Pool.apply_async bị chặn trong khi hàng chờ đợi đã đầy.

+0

Ví dụ hay (+1). – mgilson

Trả lời

6

multiprocessing.Pool có một thành viên _taskqueue loại multiprocessing.Queue, có tham số maxsize tùy chọn; tiếc là nó xây dựng nó mà không cần thiết lập thông số maxsize.

Tôi muốn đề xuất phân lớp multiprocessing.Pool với bản sao của dán multiprocessing.Pool.__init__ mà vượt qua maxsize đến _taskqueue hàm tạo.

Khỉ-vá các đối tượng (hoặc hồ bơi hoặc hàng đợi) cũng sẽ làm việc, nhưng bạn sẽ phải monkeypatch pool._taskqueue._maxsizepool._taskqueue._sem vì vậy nó sẽ khá giòn:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

Tôi đang sử dụng Python 2.7.3 và _taskqueue thuộc loại Queue.Queue. Nó có nghĩa là nó là một hàng đợi đơn giản, và không phải là một multiprocessing.Queue. Subclassing multiprocessing.Pool và overriding __init__ hoạt động tốt, nhưng khỉ-patching đối tượng không hoạt động như mong đợi. Tuy nhiên, đây là hack mà tôi đã tìm kiếm, cảm ơn. –

0

Bạn có thể thêm Queue rõ ràng với tham số maxsize và sử dụng queue.put() thay vì pool.apply_async() trong trường hợp này. Sau đó, quá trình công nhân có thể:

for a, b in iter(queue.get, sentinel): 
    # process it 

Nếu bạn muốn hạn chế số lượng đối số đầu vào tạo ra/kết quả có trong bộ nhớ để xấp xỉ số quá trình lao động tích cực sau đó bạn có thể sử dụng pool.imap*() phương pháp:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

Sử dụng 'imap' không tạo ra sự khác biệt nào. Hàng đợi đầu vào vẫn không giới hạn và sử dụng giải pháp này sẽ kết thúc việc ăn tất cả bộ nhớ. – Radim

+0

@Radim: mã 'imap' trong câu trả lời hoạt động ngay cả khi bạn cho nó một trình tạo vô hạn. – jfs

+0

Không có trong Python 2, thật không may (chưa xem mã trong py3). Đối với một số công việc xung quanh, xem [câu trả lời SO này] (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim

1

Đợi nếu pool._taskqueue vượt quá kích thước mong muốn:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

Chỉ muốn thêm rằng tôi thấy đây là giải pháp dễ nhất cho các vấn đề về bộ nhớ của tôi với tính năng đa xử lý. Tôi đã sử dụng max_apply_size = 10 và hoạt động tốt cho vấn đề của tôi, đó là chuyển đổi tệp chậm. Sử dụng một semaphore như @ecatmur cho thấy có vẻ như một giải pháp mạnh mẽ hơn nhưng có thể là quá mức cần thiết cho các kịch bản đơn giản. – Nate

Các vấn đề liên quan