17

Tôi có ứng dụng đa luồng python. Tôi muốn chạy một vòng lặp asyncio trong một thread và post calbacks và coroutines cho nó từ một thread khác. Nên dễ dàng nhưng tôi không thể có được đầu của tôi xung quanh các công cụ asyncio.python asyncio, cách tạo và hủy tác vụ từ một chủ đề khác

tôi đã đưa ra các giải pháp sau đây mà không một nửa những gì tôi muốn, cảm thấy tự do bình luận về bất cứ điều gì:

import asyncio 
from threading import Thread 

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) #why do I need that?? 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     f = functools.partial(self.loop.create_task, coro) 
     return self.loop.call_soon_threadsafe(f) 

    def cancel_task(self, xx): 
     #no idea 

@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

b.start() 
time.sleep(1) #need to wait for loop to start 
t = b.add_task(test()) 
time.sleep(10) 
#here the program runs fine but how can I cancel the task? 

b.stop() 

Vì vậy, bắt đầu và stoping vòng lặp hoạt động tốt. Tôi đã nghĩ về việc tạo nhiệm vụ bằng cách sử dụng create_task, nhưng phương thức đó không phải là luồng an toàn nên tôi đã bọc nó trong call_soon_threadsafe. Nhưng tôi muốn có thể nhận được đối tượng nhiệm vụ để có thể hủy nhiệm vụ. Tôi có thể làm một công cụ phức tạp bằng cách sử dụng Tương lai và Điều kiện, nhưng phải có một cách đơn giản hơn, không phải vậy?

Trả lời

13

Tôi nghĩ rằng bạn có thể cần phải làm cho phương pháp add_task của bạn biết liệu có được gọi từ một chuỗi khác với vòng lặp sự kiện hay không. Bằng cách đó, nếu nó được gọi từ cùng một luồng, bạn chỉ có thể gọi trực tiếp asyncio.async, nếu không, nó có thể thực hiện thêm một số công việc để truyền nhiệm vụ từ chuỗi của vòng lặp tới chuỗi đang gọi. Dưới đây là một ví dụ:

import time 
import asyncio 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
t = b.add_task(test()) # This is a real task 
time.sleep(10) 
b.stop() 

tiên, chúng ta lưu các thread id của vòng lặp sự kiện trong phương pháp run, vì vậy chúng ta có thể hình dung ra nếu cuộc gọi đến add_task đang đến từ chủ đề khác sau này. Nếu add_task được gọi từ chuỗi vòng lặp không phải sự kiện, chúng tôi sử dụng call_soon_threadsafe để gọi hàm sẽ lập lịch biểu coroutine và sau đó sử dụng concurrent.futures.Future để chuyển tác vụ trở lại chuỗi đang gọi, chờ kết quả của Future.

Lưu ý khi hủy tác vụ: Bạn khi bạn gọi cancel trên Task, một CancelledError sẽ được nâng lên trong coroutine vào lần tiếp theo vòng lặp sự kiện chạy. Điều này có nghĩa rằng coroutine mà Nhiệm vụ đang gói sẽ bị hủy bỏ do ngoại lệ trong lần tiếp theo nó đạt đến điểm sản lượng - trừ khi coroutine bắt được CancelledError và ngăn cản việc hủy bỏ nó. Cũng lưu ý rằng điều này chỉ hoạt động nếu chức năng được bọc thực sự là một coroutine gián đoạn; Ví dụ: asyncio.Future được trả lại bởi BaseEventLoop.run_in_executor, thực sự không thể bị hủy vì nó thực sự được bao quanh một số concurrent.futures.Future và không thể bị hủy khi chức năng cơ bản của chúng thực sự bắt đầu thực thi. Trong những trường hợp đó, asyncio.Future sẽ nói rằng nó bị hủy, nhưng chức năng thực sự chạy trong trình thực thi sẽ tiếp tục chạy.

Chỉnh sửa: Cập nhật ví dụ đầu tiên sử dụng concurrent.futures.Future, thay vì queue.Queue, theo đề xuất của Andrew Svetlov.

Lưu ý: asyncio.async không còn được dùng vì phiên bản 3.4.4 sử dụng asyncio.ensure_future thay thế.

+0

Cảm ơn ví dụ đã giúp tôi khắc phục một số vấn đề tôi gặp phải. Btw Tôi cũng đã để instanciate tương lai với tương lai (loop = self.loop), nếu không trong một số trường hợp trong tương lai sẽ có vòng lặp sai –

+0

@OlivierRD Bạn nên sử dụng 'concurrent.futures.Future', không phải là 'asyncio.Future'. 'concurrent.futures.Future' không dùng từ khóa' loop'. – dano

+0

tài liệu dường như có nghĩa là: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future –

6

Bạn làm mọi thứ đúng. Đối với nhiệm vụ dừng làm phương pháp

class B(Thread): 
    # ... 
    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 

BTW bạn để thiết lập một vòng lặp sự kiện cho các chủ đề đã tạo một cách rõ ràng bởi

self.loop = asyncio.new_event_loop() 
asyncio.set_event_loop(self.loop) 

asyncio tạo vòng lặp sự kiện ngầm chỉ dành cho chủ đề chính.

+0

Các mảnh còn thiếu ở đây là làm thế nào để có được xử lý với 'task' ở nơi đầu tiên. Vì OP cần sử dụng 'call_soon_threadsafe (self.loop.create_task)' trong phương thức 'add_task', nên anh ta không thực sự có một xử lý cho nhiệm vụ sau khi thêm nó vào vòng lặp. – dano

+1

OK. Bạn đúng rồi. @dano BTW bạn có thể sử dụng concurrent.futures.Future thay vì Queue trong câu trả lời của bạn. Tôi nghĩ nó sạch hơn. –

+0

Có, tôi đồng ý rằng việc sử dụng 'Tương lai' là đẹp hơn' Hàng đợi'. Tôi đã cập nhật câu trả lời của mình để phản ánh điều đó. Cảm ơn! – dano

5

chỉ để tham khảo ở đây mã tôi cuối cùng đã triển khai dựa trên trợ giúp tôi có trên trang web này, nó đơn giản hơn vì tôi không cần tất cả các tính năng. cảm ơn một lần nữa!

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def _add_task(self, future, coro): 
     task = self.loop.create_task(coro) 
     future.set_result(task) 

    def add_task(self, coro): 
     future = Future() 
     p = functools.partial(self._add_task, future, coro) 
     self.loop.call_soon_threadsafe(p) 
     return future.result() #block until result is available 

    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 
2

Kể từ phiên bản 3.4.4 asyncio cung cấp một chức năng gọi là run_coroutine_threadsafe nộp một đối tượng coroutine từ một thread để một vòng lặp sự kiện. Nó trả về một concurrent.futures.Future để truy cập kết quả hoặc hủy tác vụ.

Sử dụng ví dụ của bạn:

@asyncio.coroutine 
def test(loop): 
    try: 
     while True: 
      print("Running") 
      yield from asyncio.sleep(1, loop=loop) 
    except asyncio.CancelledError: 
     print("Cancelled") 
     loop.stop() 
     raise 

loop = asyncio.new_event_loop() 
thread = threading.Thread(target=loop.run_forever) 
future = asyncio.run_coroutine_threadsafe(test(loop), loop) 

thread.start() 
time.sleep(5) 
future.cancel() 
thread.join() 
+0

Để ngăn chặn tình trạng chạy đua hoặc bế tắc, đừng gọi trực tiếp 'future.cancel()'. Sử dụng 'loop.call_soon_threadsafe (future.cancel)' để thay thế. Xem [tại đây] (https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-multithreading). – changyuheng

+1

@ ChangYu-heng Điều này đúng với [asyncio.Future] (https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future) tương lai, nhưng [run_coroutine_threadsafe] (https: // docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) trả về [concurrent.futures.Future] (https://docs.python.org/3.4/library/concurrent.futures.html# concurrent.futures.Future) là chuỗi an toàn và không phụ thuộc vào bất kỳ vòng lặp sự kiện nào. – Vincent

+0

@Vicent Xin lỗi tôi đã không đọc kỹ câu hỏi gốc. Vì vậy, một chú thích bổ sung cho điều đó sẽ là: sử dụng 'loop.call_soon_threadsafe (future.cancel)' nếu bạn định thực thi 'future.cancel()' từ luồng không phải là vòng lặp sự kiện đang sinh sống. – changyuheng

Các vấn đề liên quan