2014-07-16 11 views
23

Tôi đang cố gắng hiểu một chút về những gì đang diễn ra đằng sau hậu trường khi sử dụng phương thức apply_sync của một nhóm đa xử lý.Ai chạy cuộc gọi lại khi sử dụng phương thức apply_async của một nhóm đa xử lý?

Ai chạy phương thức gọi lại? Đây có phải là quá trình chính được gọi là apply_async không?

Giả sử tôi gửi toàn bộ các lệnh apply_async có gọi lại và sau đó tiếp tục với chương trình của tôi. Chương trình của tôi vẫn đang hoạt động khi bắt đầu kết thúc áp dụng. Làm thế nào để gọi lại có được chạy của tôi "quá trình chính" trong khi quá trình chính vẫn còn bận rộn với kịch bản?

Đây là một ví dụ.

import multiprocessing 
import time 

def callback(x): 
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x) 

def func(x): 
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x) 
    return x 

pool = multiprocessing.Pool() 

args = range(20) 

for a in args: 
    pool.apply_async(func, (a,), callback=callback) 

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name) 

t0 = time.time() 
while time.time() - t0 < 60: 
    pass 

print 'Finished with the script' 

Đầu ra là một cái gì đó giống như

PoolWorker-1 chạy func với arg 0

PoolWorker-2 chạy func với arg 1

PoolWorker-3 chạy func với arg 2

MainProcess sẽ ngủ trong một phút < - quá trình chính là bận rộn

PoolWorker-4 chạy func với arg 3

PoolWorker-1 chạy func với arg 4

PoolWorker-2 chạy func với arg 5

PoolWorker-3 chạy func với arg 6

PoolWorker-4 chạy func với arg 7

MainProcess running callback w ith arg 0 < - quá trình chính chạy gọi lại trong khi nó vẫn còn trong vòng lặp while !!

MainProcess chạy callback với arg 1

MainProcess chạy callback với arg 2

MainProcess chạy callback với arg 3

MainProcess chạy callback với arg 4

PoolWorker-1 chạy func với arg 8

...

Đã kết thúc với kịch bản

như thế nào MainProcess chạy callback khi nó ở giữa trong khi vòng lặp ??

Có tuyên bố này về cuộc gọi lại trong tài liệu cho multiprocessing.Pool có vẻ như gợi ý nhưng tôi không hiểu.

apply_async (func [, args [, kwds [, gọi lại]]])

Một biến thể của phương pháp được áp dụng() trả về một đối tượng kết quả.

Nếu gọi lại được chỉ định thì nó phải là một cuộc gọi có thể chấp nhận một đối số duy nhất. Khi kết quả trở thành sẵn sàng gọi lại được áp dụng cho nó (trừ khi cuộc gọi không thành công). gọi lại sẽ hoàn thành ngay lập tức vì nếu không thì luồng xử lý kết quả sẽ bị chặn.

Trả lời

25

Có thực sự là một gợi ý trong tài liệu:

callback nên hoàn thành ngay lập tức kể từ khác thread mà xử lý các kết quả sẽ bị chặn.

Các cuộc gọi lại được xử lý trong quá trình chính, nhưng chúng được chạy trong chủ đề riêng biệt. Khi bạn tạo một Pool nó thực sự tạo ra một vài Thread đối tượng trong nội bộ:

class Pool(object): 
    Process = Process 

    def __init__(self, processes=None, initializer=None, initargs=(), 
       maxtasksperchild=None): 
     self._setup_queues() 
     self._taskqueue = Queue.Queue() 
     self._cache = {} 
     ... # stuff we don't care about 
     self._worker_handler = threading.Thread(
      target=Pool._handle_workers, 
      args=(self,) 
      ) 
     self._worker_handler.daemon = True 
     self._worker_handler._state = RUN 
     self._worker_handler.start() 

     self._task_handler = threading.Thread(
      target=Pool._handle_tasks, 
      args=(self._taskqueue, self._quick_put, self._outqueue, 
        self._pool, self._cache) 
      ) 
     self._task_handler.daemon = True 
     self._task_handler._state = RUN 
     self._task_handler.start() 

     self._result_handler = threading.Thread(
      target=Pool._handle_results, 
      args=(self._outqueue, self._quick_get, self._cache) 
      ) 
     self._result_handler.daemon = True 
     self._result_handler._state = RUN 
     self._result_handler.start() 

Các chủ đề thú vị đối với chúng tôi là _result_handler; chúng ta sẽ sớm giải thích tại sao.

Switching bánh răng trong một giây, khi bạn chạy apply_async, nó tạo ra một đối tượng ApplyResult trong nội bộ để quản lý nhận được kết quả từ các đứa trẻ:

def apply_async(self, func, args=(), kwds={}, callback=None): 
    assert self._state == RUN 
    result = ApplyResult(self._cache, callback) 
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 
    return result 

class ApplyResult(object): 

    def __init__(self, cache, callback): 
     self._cond = threading.Condition(threading.Lock()) 
     self._job = job_counter.next() 
     self._cache = cache 
     self._ready = False 
     self._callback = callback 
     cache[self._job] = self 


    def _set(self, i, obj): 
     self._success, self._value = obj 
     if self._callback and self._success: 
      self._callback(self._value) 
     self._cond.acquire() 
     try: 
      self._ready = True 
      self._cond.notify() 
     finally: 
      self._cond.release() 
     del self._cache[self._job] 

Như bạn có thể thấy, phương pháp _set là một trong những mục đích thực sự thực hiện các callback được thông qua, giả sử nhiệm vụ đã thành công. Cũng lưu ý rằng nó tự thêm vào một số toàn cầu cache dict ở cuối __init__.

Bây giờ, quay lại đối tượng chuỗi _result_handler. đối tượng mà các cuộc gọi _handle_results chức năng, mà trông như thế này:

while 1: 
     try: 
      task = get() 
     except (IOError, EOFError): 
      debug('result handler got EOFError/IOError -- exiting') 
      return 

     if thread._state: 
      assert thread._state == TERMINATE 
      debug('result handler found thread._state=TERMINATE') 
      break 

     if task is None: 
      debug('result handler got sentinel') 
      break 

     job, i, obj = task 
     try: 
      cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called! 
     except KeyError: 
      pass 

     # More stuff 

Đó là một vòng lặp mà chỉ kéo kết quả từ trẻ em ra khỏi hàng đợi, tìm entry cho nó trong cache, và kêu gọi _set, mà thực hiện callback của chúng tôi. Nó có thể chạy mặc dù bạn đang ở trong một vòng lặp vì nó không chạy trong chủ đề chính.

+0

Cảm ơn Dano vì đã dành thời gian viết câu trả lời chi tiết như vậy! Nếu tôi hiểu chính xác, hồ bơi sẽ tạo một chuỗi * đơn * mới (result_handler) mà công việc của nó là chờ đợi xung quanh để apply_async hoàn thành và sau đó gọi lại callback trong chuỗi của result_handler (là một phần của MainProcess). Các cuộc gọi lại (đối với một đối tượng đơn lẻ) có được gọi tuần tự không? I E. Một loạt các apply_async có thể kết thúc với nhau nhưng các callbacks sẽ được chạy từng cái một trong serial bởi result_handler? – Alex

+1

Một câu hỏi khác. Điều gì sẽ xảy ra nếu chức năng gọi lại và tập lệnh chính cả hai lẫn lộn với cùng một đối tượng (trong MainProcess)? Có thể có hành vi không thể đoán trước? I E. nếu gọi lại và một cái gì đó sau này trong kịch bản chính cả hai cố gắng ghi vào cùng một tập tin hoặc sửa đổi cùng một mảng. Khi gọi lại thực sự được chạy những người biết những gì kịch bản chính sẽ được làm tại thời điểm đó. – Alex

+4

@Alex Có, các cuộc gọi lại sẽ được thực hiện một cách bình thường.Chuỗi '_result_handler' kéo một tác vụ đã hoàn thành ra khỏi hàng đợi, gọi' _set' (chạy lệnh gọi lại), sau đó chuyển sang bước tiếp theo. Đây là lý do tại sao tài liệu nói để đảm bảo cuộc gọi lại hoàn tất ngay lập tức; thực thi các cuộc gọi lại chặn các kết quả khác không được xử lý. – dano

Các vấn đề liên quan