2013-08-24 32 views
18

Dưới đây là các chương trình:Bộ nhớ sử dụng tiếp tục phát triển với multiprocessing.pool Python của

#!/usr/bin/python 

import multiprocessing 

def dummy_func(r): 
    pass 

def worker(): 
    pass 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    for index in range(0,100000): 
     pool.apply_async(worker, callback=dummy_func) 

    # clean up 
    pool.close() 
    pool.join() 

tôi thấy sử dụng bộ nhớ (cả VIRT và RES) tiếp tục tăng trưởng lên đến gần()/join(), là có bất kỳ giải pháp cho thoát khỏi điều này? Tôi đã thử maxtasksperchild với 2.7 nhưng nó cũng không giúp được gì.

Tôi có một chương trình phức tạp hơn gọi hàm apply_async() ~ 6M lần và tại ~ 1,5M điểm tôi đã có 6G + RES, để tránh tất cả các yếu tố khác, tôi đã đơn giản hóa chương trình thành phiên bản cao hơn.

EDIT:

Hóa ra phiên bản này hoạt động tốt hơn, nhờ sự đầu vào của tất cả mọi người:

#!/usr/bin/python 

import multiprocessing 

ready_list = [] 
def dummy_func(index): 
    global ready_list 
    ready_list.append(index) 

def worker(index): 
    return index 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    result = {} 
    for index in range(0,1000000): 
     result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) 
     for ready in ready_list: 
      result[ready].wait() 
      del result[ready] 
     ready_list = [] 

    # clean up 
    pool.close() 
    pool.join() 

Tôi không đặt bất kỳ khóa có như tôi tin rằng quá trình chính là đơn luồng (callback là nhiều hơn hoặc ít hơn như một điều hướng sự kiện cho mỗi tài liệu tôi đọc).

Tôi đã thay đổi phạm vi chỉ mục của v1 thành 1.000.000, tương tự như v2 và thực hiện một số kiểm tra - thật lạ với tôi v2 thậm chí còn nhanh hơn 10% so với v1 (33 giây so với 37 giây). v2 chắc chắn là một người chiến thắng về sử dụng bộ nhớ, nó không bao giờ vượt quá 300M (VIRT) và 50M (RES), trong khi v1 được sử dụng là 370M/120M, tốt nhất là 330M/85M. Tất cả các con số chỉ là 3 ~ 4 lần thử nghiệm, chỉ tham khảo.

+1

Chỉ cần suy đoán ở đây, nhưng xếp hàng một triệu đối tượng chiếm không gian. Có lẽ việc sắp xếp chúng sẽ giúp ích cho bạn. Các tài liệu không dứt khoát, nhưng [ví dụ] (http://pydoc.net/Python/multiprocessing/2.6.2.1/multiprocessing.examples.mp_pool/) (tìm kiếm để gọi lại Kiểm tra) cho thấy kết quả apply_async đang được chờ đợi, ngay cả khi có callbacks. Có thể cần phải chờ để xóa hàng đợi kết quả. – tdelaney

+0

Vì vậy, đa xử lý.pool có thể không phải là công cụ phù hợp với tôi, như gọi lại thực sự không làm công việc dọn dẹp, là nó có thể làm sạch trong gọi lại? Vấn đề là tôi không thể chờ đợi sau khi apply_async() gọi như trong công nhân thế giới thực() mất ~ 0,1 giây cho mỗi yêu cầu (một số yêu cầu HTTP). –

+1

Dự đoán hoang dã: 'apply_asynch' tạo một thể hiện [' AsynchResult'] (http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult). 'Pool' có thể có một số tham chiếu đến các đối tượng này, vì chúng phải có khả năng trả về kết quả khi tính toán đã hoàn thành, nhưng trong vòng lặp của bạn, bạn chỉ đơn giản là ném chúng đi. Có lẽ bạn nên gọi 'get()' hoặc 'wait()' trên kết quả asynch tại một số điểm, có thể sử dụng đối số 'callback' của' apply_asynch'. – Bakuriu

Trả lời

6

Sử dụng map_async thay vì apply_async để tránh sử dụng bộ nhớ quá mức.

Ví dụ đầu tiên của bạn, thay đổi hai dòng sau:

for index in range(0,100000): 
    pool.apply_async(worker, callback=dummy_func) 

để

pool.map_async(worker, range(100000), callback=dummy_func) 

Nó sẽ kết thúc trong nháy trước khi bạn có thể thấy sử dụng bộ nhớ của nó trong top. Thay đổi danh sách thành một danh sách lớn hơn để thấy sự khác biệt. Nhưng lưu ý map_async trước tiên sẽ chuyển đổi lặp lại bạn chuyển sang danh sách để tính toán độ dài nếu nó không có phương thức __len__. Nếu bạn có một vòng lặp của một số lượng lớn các phần tử, bạn có thể sử dụng itertools.islice để xử lý chúng theo các phần nhỏ hơn.

Tôi gặp sự cố về bộ nhớ trong chương trình thực tế với nhiều dữ liệu hơn và cuối cùng tìm ra thủ phạm là apply_async.

P.S., về mức sử dụng bộ nhớ, hai ví dụ của bạn không có sự khác biệt rõ ràng.

4

Tôi có bộ dữ liệu đám mây điểm 3d rất lớn mà tôi đang xử lý. Tôi đã thử sử dụng mô-đun đa xử lý để tăng tốc quá trình xử lý, nhưng tôi đã bắt đầu thoát khỏi lỗi bộ nhớ. Sau khi một số nghiên cứu và thử nghiệm tôi xác định rằng tôi đã điền vào hàng đợi các nhiệm vụ được xử lý nhanh hơn nhiều so với các quy trình con có thể làm trống nó. Tôi chắc chắn bằng cách chunking, hoặc sử dụng map_async hoặc một cái gì đó tôi có thể đã điều chỉnh tải, nhưng tôi không muốn thực hiện những thay đổi lớn cho logic xung quanh.

Giải pháp câm tôi nhấn vào là kiểm tra chiều dài pool._cache liên tục và nếu bộ nhớ cache quá lớn thì hãy chờ hàng đợi trống.

Trong mainloop của tôi, tôi đã có một bộ đếm và một ticker status:

# Update status 
count += 1 
if count%10000 == 0: 
    sys.stdout.write('.') 
    if len(pool._cache) > 1e6: 
     print "waiting for cache to clear..." 
     last.wait() # Where last is assigned the latest ApplyResult 

Vì vậy, mỗi chèn 10k vào hồ bơi tôi kiểm tra nếu có hơn 1 triệu hoạt động xếp hàng đợi (khoảng 1G bộ nhớ được sử dụng trong Quy trình chính). Khi hàng đợi đầy, tôi chỉ đợi công việc đã chèn cuối cùng kết thúc.

Hiện chương trình của tôi có thể chạy hàng giờ mà không hết bộ nhớ. Quá trình chính chỉ thỉnh thoảng tạm dừng trong khi công nhân tiếp tục xử lý dữ liệu.

BTW thành viên _cache được ghi chép lại các ví dụ đa mô-đun hồ bơi:

# 
# Check there are no outstanding tasks 
# 

assert not pool._cache, 'cache = %r' % pool._cache 
15

Tôi có vấn đề bộ nhớ thời gian gần đây, kể từ khi tôi được sử dụng nhiều lần so với chức năng đa xử lý, vì vậy nó giữ các quá trình sinh sản, và để lại cho họ trong ký ức.

Đây là giải pháp tôi đang sử dụng hiện nay:

def myParallelProcess(ahugearray) 
from multiprocessing import Pool 
from contextlib import closing 
with closing(Pool(15)) as p: 
    res = p.imap_unordered(simple_matching, ahugearray, 100) 
return res 

tôi ❤ với

+2

Điều này giải quyết được vấn đề của tôi sau khi dành nhiều ngày cho vấn đề này! Cảm ơn rất nhiều! Tôi đã tạo ra một hồ bơi bên trong một vòng lặp, vì vậy tôi đã kết thúc sinh sản quá nhiều quá trình, mỗi một tiêu thụ rất nhiều bộ nhớ và không bao giờ thoát. Tôi chỉ cần làm mypool.close() ở cuối vòng lặp – MohamedEzz

1

Tôi nghĩ rằng đây cũng tương tự như the question I posted, nhưng tôi không chắc chắn bạn có sự chậm trễ như vậy. Vấn đề của tôi là tôi đã sản xuất kết quả từ hồ bơi đa xử lý nhanh hơn tôi đã tiêu thụ chúng, vì vậy chúng được xây dựng trong bộ nhớ. Để tránh điều đó, tôi đã sử dụng một số semaphore để điều chỉnh các yếu tố đầu vào vào hồ bơi để chúng không đi quá xa so với các kết quả đầu ra mà tôi đã tiêu thụ.

0

Chỉ cần tạo hồ bơi trong vòng lặp của bạn và đóng nó ở cuối vòng lặp với pool.close().

Các vấn đề liên quan