2012-03-07 27 views
6

Tôi khá mới với python. Tôi đang sử dụng mô-đun đa xử lý để đọc các dòng văn bản trên stdin, chuyển đổi chúng theo cách nào đó và viết chúng vào cơ sở dữ liệu. Dưới đây là một đoạn mã của tôi:python pool apply_async và map_async không chặn trên hàng đợi đầy đủ

batch = [] 
pool = multiprocessing.Pool(20) 
i = 0 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     pool.apply_async(insert, args=(batch,i+1)) 
     batch = [] 
pool.apply_async(insert, args=(batch,i)) 
pool.close() 
pool.join() 

Bây giờ tất cả hoạt động tốt, cho đến khi tôi nhận được để xử lý tập tin đầu vào rất lớn (hàng trăm triệu dòng) mà tôi ống vào chương trình python của tôi. Tại một số thời điểm, khi cơ sở dữ liệu của tôi chậm hơn, tôi thấy bộ nhớ đầy.

Sau khi phát một số, hóa ra là pool.apply_async cũng như pool.map_async không bao giờ chặn, để hàng đợi các cuộc gọi được xử lý ngày càng lớn.

Cách tiếp cận chính xác cho vấn đề của tôi là gì? Tôi sẽ mong đợi một tham số mà tôi có thể thiết lập, điều đó sẽ chặn cuộc gọi pool.apply_async, ngay khi đạt đến độ dài hàng đợi nhất định. AFAIR trong Java có thể cung cấp cho ThreadPoolExecutor một BlockingQueue với độ dài cố định cho mục đích đó.

Cảm ơn!

+1

_ "nó bật ra pool.apply_async đó cũng như pool.map_async không bao giờ chặn" _ - tất cả mọi thứ tôi đang tìm kiếm – leon

Trả lời

2

apply_async trả về một đối tượng AsyncResult, mà bạn có thể wait trên:

if len(batch) >= 10000: 
    r = pool.apply_async(insert, args=(batch, i+1)) 
    r.wait() 
    batch = [] 

Mặc dù nếu bạn muốn làm điều này một cách sạch hơn, bạn nên sử dụng một multiprocessing.Queue với một maxsize của 10000, và lấy được một Worker lớp học từ multiprocessing.Process tìm nạp từ hàng đợi đó.

+1

cũng chờ đợi trên asyncResult không giúp như vấn đề của tôi là hàng đợi trong Hồ bơi phát triển lớn. Tôi tự hỏi nếu tôi có thể kiểm soát kích thước của hàng đợi nội bộ trong hồ bơi? – konstantin

+0

@konstantin: Tôi không chắc tôi hiểu. Trong khi bạn đang chờ đợi 'AsyncResult', quy trình tổng thể không thể lấp đầy lô tiếp theo, phải không? –

9

Chỉ trong trường hợp một số kết thúc ở đây, đây là cách tôi giải quyết vấn đề: Tôi đã ngừng sử dụng đa xử lý. Dưới đây là cách tôi làm điều đó bây giờ:

#set amount of concurrent processes that insert db data 
processes = multiprocessing.cpu_count() * 2 

#setup batch queue 
queue = multiprocessing.Queue(processes * 2) 

#start processes 
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches  
batch=[] 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     queue.put((batch,i+1)) 
     batch = [] 
if batch: 
    queue.put((batch,i+1)) 

#stop processes using poison-pill 
for _ in range(processes): queue.put((None,None)) 

print "all done." 

trong phương pháp chèn quá trình xử lý của từng lô được bọc trong một vòng lặp mà kéo từ hàng đợi cho đến khi nó nhận được thuốc độc:

while True: 
    batch, end = queue.get() 
    if not batch and not end: return #poison pill! complete! 
    [process the batch] 
print 'worker done.' 
+0

Ví dụ đơn giản đẹp. Đa xử lý của hồ bơi là thường xuyên nhiều rắc rối hơn nó có giá trị, đặc biệt là kể từ khi tạo hồ bơi quá trình của riêng bạn là khá đơn giản. – travc

8

Các apply_asyncmap_async chức năng được thiết kế để không chặn quá trình chính. Để làm như vậy, các Pool duy trì một nội bộ Queue kích thước tiếc là không thể thay đổi.

Cách có thể giải quyết vấn đề bằng cách sử dụng số Semaphore được khởi tạo với kích thước bạn muốn xếp hàng. Bạn có được và phát hành semaphore trước khi cho ăn hồ bơi và sau khi một công nhân đã hoàn thành nhiệm vụ.

Dưới đây là ví dụ làm việc với Python 2.6 trở lên.

from threading import Semaphore 
from multiprocessing import Pool 

def task_wrapper(f): 
    """Python2 does not allow a callback for method raising exceptions, 
    this wrapper ensures the code run into the worker will be exception free. 

    """ 
    try: 
     return f() 
    except: 
     return None 

def TaskManager(object): 
    def __init__(self, processes, queue_size): 
     self.pool = Pool(processes=processes) 
     self.workers = Semaphore(processes + queue_size) 

    def new_task(self, f): 
     """Start a new task, blocks if queue is full.""" 
     self.workers.acquire() 
     self.pool.apply_async(task_wrapper, args=(f,), callback=self.task_done)) 

    def task_done(self): 
     """Called once task is done, releases the queue is blocked.""" 
     self.workers.release() 
Các vấn đề liên quan