2012-06-24 31 views
19

Tôi đang sử dụng multiprocessing.imap_unordered để thực hiện một tính toán trên một danh sách các giá trị:đa Python và bộ nhớ

def process_parallel(fnc, some_list): 
    pool = multiprocessing.Pool() 
    for result in pool.imap_unordered(fnc, some_list): 
     for x in result: 
      yield x 
    pool.terminate() 

Mỗi cuộc gọi đến fnc trả về một đối tượng HUGE kết quả là do thiết kế. Tôi có thể lưu trữ N trường hợp của đối tượng như vậy trong RAM, trong đó N ~ cpu_count, nhưng không nhiều hơn (không phải hàng trăm).

Hiện tại, việc sử dụng chức năng này chiếm quá nhiều bộ nhớ. Bộ nhớ là hoàn toàn chi tiêu trong quá trình chính, không phải trong công nhân.

Làm thế nào để imap_unordered lưu trữ kết quả đã hoàn thành? Tôi có nghĩa là các kết quả đã được trả về bởi công nhân nhưng chưa được chuyển cho người dùng. Tôi nghĩ rằng nó là thông minh và chỉ tính chúng "lười biếng" khi cần thiết, nhưng dường như không.

Có vẻ như tôi không thể tiêu thụ kết quả của process_parallel đủ nhanh, hồ bơi tiếp tục xếp hàng các vật thể khổng lồ này từ fnc ở đâu đó, bên trong và sau đó thổi lên. Có cách nào để tránh điều này không? Hạn chế hàng đợi nội bộ của nó bằng cách nào đó?


Tôi đang sử dụng Python2.7. Chúc mừng.

+0

Cũng từ những gì tôi nhìn thấy 'yield' là trong quá trình chính, không phải bên trong' fnc' (tức là, chức năng được thực hiện bởi các công nhân). là 'fnc' chính nó làm đánh giá lười biếng? – Felix

+0

@ FelixBonkoski Không, 'fnc' lấy một mục duy nhất từ' some_list', và tính toán và trả về một đối tượng khổng lồ từ nó. – user124114

+0

Chỉ giới hạn tốc độ dựa trên bộ nhớ khả dụng. –

Trả lời

10

Như bạn có thể thấy bằng cách xem xét tệp nguồn tương ứng (python2.7/multiprocessing/pool.py), IMapUnorderedIterator sử dụng cá thể collections.deque để lưu trữ kết quả. Nếu một mục mới xuất hiện, nó sẽ được thêm vào và bị loại bỏ trong lần lặp lại.

Như bạn đã đề xuất, nếu một đối tượng lớn khác xuất hiện trong khi luồng chính vẫn đang xử lý đối tượng, thì cũng sẽ được lưu trữ trong bộ nhớ.

gì bạn có thể thử là một cái gì đó như thế này:

it = pool.imap_unordered(fnc, some_list) 
for result in it: 
    it._cond.acquire() 
    for x in result: 
     yield x 
    it._cond.release() 

này nên gây ra các nhiệm vụ do-thu-thread để có được chặn trong khi bạn xử lý một mục nếu nó đang cố gắng để đưa các đối tượng kế tiếp vào deque. Do đó không nên có nhiều hơn hai đối tượng khổng lồ trong bộ nhớ. Nếu điều đó phù hợp với trường hợp của bạn, tôi không biết;)

+0

Tôi không làm theo ví dụ này, không phải là 'nó' chỉ đơn giản là một máy phát và vì vậy nó sẽ không có phương thức' _cond.acquire() 'và' release'? Nếu bạn cần tự viết chúng, thì loại đối tượng nào cần '._cond'? – Hooked

+0

Âm thanh như người dùng quan tâm đến hiệu suất tại sao giới hạn nó thành một số nhỏ với một khóa đơn giản? –

+0

@Hooked: 'imap_unordered' trả về một' IMapUnorderedIterator', có các chức năng này có thể được nhìn thấy bằng cách nhìn vào mã nguồn tương ứng. Vì kết quả thu-chủ đề sẽ (khi nhận được một kết quả) yêu cầu khóa để nhập kết quả vào deque, điều này sẽ ngăn chặn các chủ đề và ngăn chặn nó từ tiêu thụ nhiều bộ nhớ hơn. – rumpel

2

Giải pháp đơn giản nhất tôi có thể nghĩ là thêm một đóng để bọc chức năng fnc sử dụng một semaphore để kiểm soát tổng số công việc đồng thời thực thi có thể thực hiện cùng một lúc (tôi giả định quá trình/luồng chính sẽ tăng dần semaphore). Giá trị semaphore có thể được tính toán dựa trên kích thước công việc và bộ nhớ có sẵn.

Các vấn đề liên quan