2014-09-24 18 views
8

Trong ThreadPoolExecutor (TPE), gọi lại luôn được đảm bảo để chạy trong cùng một luồng với chức năng đã gửi?Python ThreadPoolExecutor - là cuộc gọi lại được bảo đảm để chạy trong cùng một luồng như được gửi func?

Ví dụ: tôi đã thử nghiệm mã này bằng mã sau. Tôi chạy nó nhiều lần và có vẻ như funccallback luôn chạy trong cùng một chuỗi.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
     print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

Tuy nhiên, nó dường như thất bại khi tôi loại bỏ các time.sleep(random.random()) báo cáo, ví dụ ít nhất một vài func chức năng và callbackskhông chạy trong cùng một thread.

Đối với một dự án mà tôi đang làm việc, cuộc gọi lại phải luôn chạy trên cùng một luồng với chức năng được gửi, vì vậy tôi muốn đảm bảo rằng điều này được đảm bảo bởi TPE. (Và cũng là kết quả của bài kiểm tra mà không có giấc ngủ ngẫu nhiên có vẻ khó hiểu).

Tôi đã xem source code for executors và có vẻ như chúng tôi không chuyển chuỗi sang chuỗi chính trước khi chúng tôi chạy gọi lại. Nhưng chỉ muốn chắc chắn.

Trả lời

6

Gọi lại cho Future được gửi đến ThreadPoolExecutor sẽ được chạy trong cùng một chuỗi với nhiệm vụ đang chạy, nhưng chỉ khi gọi lại được thêm vào Future trước khi tác vụ hoàn tất. Nếu bạn thêm gọi lại sau khi Future hoàn tất, gọi lại sẽ thực hiện trong bất cứ chủ đề mà bạn gọi là add_done_callback trong Bạn có thể thấy điều này bằng cách nhìn vào add_done_callback nguồn:.

def add_done_callback(self, fn): 
    """Attaches a callable that will be called when the future finishes. 

    Args: 
     fn: A callable that will be called with this future as its only 
      argument when the future completes or is cancelled. The callable 
      will always be called by a thread in the same process in which 
      it was added. If the future has already completed or been 
      cancelled then the callable will be called immediately. These 
      callables are called in the order that they were added. 
    """ 
    with self._condition: 
     if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 
      self._done_callbacks.append(fn) 
      return 
    fn(self) 

Nếu trạng thái của Future cho thấy nhằm hủy bỏ hoặc kết thúc, fn ngay lập tức được gọi trong luồng hiện tại của việc thực thi. Nếu không, nó sẽ được thêm vào danh sách gọi lại nội bộ để chạy khi số Future hoàn tất.

Ví dụ:

>>> def func(*args): 
... time.sleep(5) 
... print("func {}".format(threading.current_thread())) 
>>> def cb(a): print("cb {}".format(threading.current_thread())) 
... 
>>> fut = ex.submit(func) 
>>> func <Thread(Thread-1, started daemon 140084551563008)> 
>>> fut = e.add_done_callback(cb) 
cb <_MainThread(MainThread, started 140084622018368)> 
+0

Và những gì về 'ProcessPoolExecutor'? Callback của 'get_ident' cũng khác nhau – Winand

Các vấn đề liên quan