2011-06-28 18 views
9

Tôi đang cố gắng để có được thời gian chờ để làm việc trong python3.2 bằng cách sử dụng mô-đun concurrent.futures. Tuy nhiên khi nó hết thời gian chờ, nó không thực sự dừng việc thực thi. Tôi đã thử với cả hai chủ đề và quá trình thực thi hồ bơi không phải của họ ngừng nhiệm vụ, và chỉ cho đến khi nó hoàn thành không một thời gian chờ trở nên lớn lên. Vì vậy, không ai biết nếu nó có thể để có được điều này làm việc?Làm thế nào để sử dụng concurrent.futures với timeouts?

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

def run_loop(max_number): 
    print("Started:", datetime.datetime.now(), max_number) 
    last_number = 0; 
    for i in range(1, max_number + 1): 
     last_number = i * i 
    return last_number 

def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 

if __name__ == '__main__': 
    main() 

Trả lời

15

Theo tôi có thể biết, TimeoutError thực sự được tăng lên khi bạn mong đợi và không sau khi hoàn thành nhiệm vụ.

Tuy nhiên, chính chương trình của bạn sẽ tiếp tục chạy cho đến khi tất cả các tác vụ đang chạy được hoàn thành. Điều này là bởi vì hiện đang thực hiện nhiệm vụ (trong trường hợp của bạn, có lẽ tất cả các nhiệm vụ của bạn gửi, như kích thước hồ bơi của bạn bằng số lượng các nhiệm vụ), không thực sự "giết".

TimeoutError được nâng lên, để bạn có thể chọn không đợi cho đến khi tác vụ được hoàn thành (và thực hiện một việc khác), nhưng nhiệm vụ sẽ tiếp tục chạy cho đến khi hoàn thành. Và python sẽ không thoát ra chừng nào còn có các nhiệm vụ chưa hoàn thành trong các luồng/các tiến trình con của Executor của bạn.

Theo như tôi biết, không thể chỉ "ngừng" hiện đang thực hiện Hợp đồng tương lai, bạn chỉ có thể "hủy" các tác vụ theo lịch chưa bắt đầu. Trong trường hợp của bạn, sẽ không có bất kỳ, nhưng hãy tưởng tượng rằng bạn có hồ bơi của 5 chủ đề/quy trình, và bạn muốn xử lý 100 mặt hàng. Tại một số điểm, có thể có 20 nhiệm vụ hoàn thành, 5 nhiệm vụ đang chạy và 75 tác vụ được lên lịch. Trong trường hợp này, bạn sẽ có thể hủy bỏ 76 tác vụ được lên lịch đó, nhưng 4 tác vụ đang chạy sẽ tiếp tục cho đến khi hoàn thành, cho dù bạn có đợi kết quả hay không.

Mặc dù không thể thực hiện theo cách đó, tôi đoán có các cách để đạt được kết quả mong muốn cuối cùng của bạn. Có lẽ phiên bản này có thể giúp bạn trên con đường (không chắc chắn nếu nó thực hiện chính xác những gì bạn muốn, nhưng nó có thể là của một số sử dụng):

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

class Task: 
    def __init__(self, max_number): 
     self.max_number = max_number 
     self.interrupt_requested = False 

    def __call__(self): 
     print("Started:", datetime.datetime.now(), self.max_number) 
     last_number = 0; 
     for i in xrange(1, self.max_number + 1): 
      if self.interrupt_requested: 
       print("Interrupted at", i) 
       break 
      last_number = i * i 
     print("Reached the end") 
     return last_number 

    def interrupt(self): 
     self.interrupt_requested = True 

def main(): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: 
     tasks = [Task(num) for num in max_numbers] 
     for task, future in [(i, executor.submit(i)) for i in tasks]: 
      try: 
       print(future.result(timeout=1)) 
      except concurrent.futures.TimeoutError: 
       print("this took too long...") 
       task.interrupt() 


if __name__ == '__main__': 
    main() 

Bằng cách tạo ra một đối tượng có thể được gọi cho mỗi "nhiệm vụ", và đưa ra những cho người thi hành thay vì chỉ là một hàm đơn giản, bạn có thể cung cấp một cách để "làm gián đoạn" nhiệm vụ. Mẹo: loại bỏ các dòng task.interrupt() và xem những gì sẽ xảy ra, nó có thể làm cho nó dễ dàng hơn để hiểu lời giải thích dài của tôi ở trên ;-)

4

Gần đây tôi cũng trúng vấn đề này, và cuối cùng tôi đưa ra các giải pháp sau đây sử dụng ProcessPoolExecutor:


def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 
      stop_process_pool(executor) 

def stop_process_pool(executor): 
    for pid, processes in executor._processes.items(): 
     process.terminate() 
    executor.shutdown() 
+0

txmc bạn có thể chỉ cần giết một trong các quy trình không? Hay bạn phải giết tất cả? –

Các vấn đề liên quan