2009-10-08 24 views
15

Tôi muốn kết xuất một số multiprocessing.Queue vào danh sách. Đối với tác vụ đó, tôi đã viết hàm sau:Dumping a multiprocessing.Queue vào danh sách

import Queue 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    # START DEBUG CODE 
    initial_size = queue.qsize() 
    print("Queue has %s items initially." % initial_size) 
    # END DEBUG CODE 

    while True: 
     try: 
      thing = queue.get(block=False) 
      result.append(thing) 
     except Queue.Empty: 

      # START DEBUG CODE 
      current_size = queue.qsize() 
      total_size = current_size + len(result) 
      print("Dumping complete:") 
      if current_size == initial_size: 
       print("No items were added to the queue.") 
      else: 
       print("%s items were added to the queue." % \ 
         (total_size - initial_size)) 
      print("Extracted %s items from the queue, queue has %s items \ 
      left" % (len(result), current_size)) 
      # END DEBUG CODE 

      return result 

Nhưng vì lý do nào đó, nó không hoạt động.

Quan sát phiên vỏ sau:

>>> import multiprocessing 
>>> q = multiprocessing.Queue() 
>>> for i in range(100): 
...  q.put([range(200) for j in range(100)]) 
... 
>>> q.qsize() 
100 
>>> l=dump_queue(q) 
Queue has 100 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 99 items left 
>>> l=dump_queue(q) 
Queue has 99 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 3 items from the queue, queue has 96 items left 
>>> l=dump_queue(q) 
Queue has 96 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 95 items left 
>>> 

gì đang xảy ra ở đây? Tại sao không phải tất cả các mặt hàng đều được bán?

Trả lời

20

Hãy thử điều này:

import Queue 
import time 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    for i in iter(queue.get, 'STOP'): 
     result.append(i) 
    time.sleep(.1) 
    return result 

import multiprocessing 
q = multiprocessing.Queue() 
for i in range(100): 
    q.put([range(200) for j in range(100)]) 
q.put('STOP') 
l=dump_queue(q) 
print len(l) 

hàng đợi Đa có một bộ đệm bên trong đó có một sợi đường ống kéo làm việc ra một bộ đệm và xóa nó vào ống. Nếu không phải tất cả các đối tượng đã được xả, tôi có thể thấy một trường hợp mà Empty được nâng lên sớm. Sử dụng một sentinel để cho biết kết thúc của hàng đợi là an toàn (và đáng tin cậy). Ngoài ra, việc sử dụng thành ngữ lặp (get, sentinel) chỉ tốt hơn là dựa vào Empty.

Tôi không thích rằng nó có thể tăng trống do thời gian xả (tôi đã thêm time.sleep (.1) để cho phép chuyển ngữ cảnh sang luồng feeder, bạn có thể không cần nó, nó hoạt động mà không có nó - đó là thói quen giải phóng GIL).

+3

Ý tưởng chung Jesse, nhưng thậm chí an toàn hơn và đáng tin cậy hơn sẽ được sử dụng như sentinel một chuỗi 'uuid' (hoặc cho luồng hơn là đa xử lý, cụ thể' sentinel = object()), chứ không phải là một chuỗi chung. Thậm chí sau đó bạn có thể gặp rắc rối nếu một số thread khác là nhận được cùng một lúc; cách duy nhất thực sự _safe_ là người dựa vào nội bộ của Queue, than ôi! -) –

+0

Bạn nói đúng. Tôi đã đi cho các giải pháp 'nhanh chóng' bằng cách sử dụng một chuỗi sentinel, nhưng điều đó chỉ hoạt động trong trường hợp cụ thể này. Tôi đang bắt đầu tự hỏi nếu mp.queue cần một số hỗ trợ sentinel được xây dựng vào của Hàng đợi – jnoller

+0

Cảm ơn câu trả lời này. Tôi đã có một vấn đề tương tự ngày hôm nay mà phản ứng này đã giúp tôi giải quyết. Ghi đầy đủ vấn đề: http://www.bryceboe.com/2011/01/28/the-python-multiprocessing-queue-and-large-objects/ – bboe

3

Trong một số trường hợp, chúng tôi đã tính toán mọi thứ và chúng tôi chỉ muốn chuyển đổi Hàng đợi.

shared_queue = Queue() 
shared_queue_list = [] 
... 
join() #All process are joined 
while shared_queue.qsize() != 0: 
    shared_queue_list.append(shared_queue.get()) 

Bây giờ shared_queue_list có kết quả được chuyển đổi thành danh sách.

Các vấn đề liên quan