2016-01-10 16 views
12

Tôi đang viết một công cụ kết nối với số X ổ cắm UNIX, gửi một lệnh và lưu đầu ra vào hệ thống tệp cục bộ. Nó chạy mỗi X giây. Để thực hiện một số dọn dẹp khi công cụ nhận tín hiệu chấm dứt, tôi đăng ký một chức năng (tắt) để báo hiệu tín hiệu.SIGHUP và signal.SIGTERM. Chức năng này hủy bỏ tất cả các nhiệm vụ và sau đó đóng vòng lặp sự kiện.Cách thích hợp để tắt các tác vụ asyncio

Vấn đề của tôi là tôi có được

RuntimeError: Event loop stopped before Future completed

khi tôi gửi signal.SIGTERM (giết 'pid'). Tôi đã đọc tài liệu về việc hủy tác vụ hai lần nhưng tôi đã không phát hiện ra những gì tôi đang làm sai ở đây.

Tôi cũng nhận thấy điều gì đó lạ, khi tôi gửi tín hiệu chấm dứt chương trình đang ở chế độ ngủ và tôi thấy trong nhật ký rằng nó đánh thức pull_stats() coroutine, bạn có thể thấy điều này trong 2 dòng đầu tiên của nhật ký .

Log:

21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 
21:53:45,857 [23031] [MainThread:pull_stats ] INFO  pull statistics 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,859 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  received stop signal, cancelling tasks... 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  stopping event loop 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  bye, exiting... 
Traceback (most recent call last): 
    File "./pull.py", line 249, in <module> 
    main() 
    File "./pull.py", line 245, in main 
    supervisor(loop, config) 
    File "./pull.py", line 161, in supervisor 
    config['pull']['socket-dir'], storage_dir, loop)) 
    File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 
RuntimeError: Event loop stopped before Future completed. 

Đây là mã:

def shutdown(loop): 
    LOGGER.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     LOGGER.info(task.cancel()) 
    LOGGER.info('stopping event loop') 
    loop.stop() 
    LOGGER.info('bye, exiting...') 


def write_file(filename, data): 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     return False 
    else: 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop): 
    connect = asyncio.open_unix_connection(socket_file) 
    reader, writer = yield from asyncio.wait_for(connect, 1) 

    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1] 
    filename = os.path.join(storage_dir, filename) 
    result = yield from loop.run_in_executor(None, write_file, filename, data) 

    return result 


@asyncio.coroutine 
def pull_stats(socket_dir, storage_dir, loop): 
    socket_files = glob.glob(socket_dir + '/*sock*') 
    coroutines = [get(socket_file, cmd, storage_dir, loop) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    status = yield from asyncio.gather(*coroutines) 

    if len(set(status)) == 1 and True in set(status): 
     return True 
    else: 
     return False 


def supervisor(loop, config): 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 

    while True: 
     start_time = int(time.time()) 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to create directory {d}:{e}".format(d=storage_dir, 
                   e=exc) 
      LOGGER.critical(msg) 

     # Launch all connections. 
     result = loop.run_until_complete(pull_stats(
      config['pull']['socket-dir'], storage_dir, loop)) 

     if result: 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       LOGGER.critical("failed to move %s to %s: %s", storage_dir, 
           dst_dir, exc) 
       break 
      else: 
       LOGGER.info('statistics are saved in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      LOGGER.critical('failed to pull stats') 
      shutil.rmtree(storage_dir) 

     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      time.sleep(sleep) 
    loop.close() 
    sys.exit(1) 


def main(): 
    args = docopt(__doc__, version=VERSION) 
    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    config.read(args['--file']) 

    loop = asyncio.get_event_loop() 

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) 
    LOGGER.setLevel(num_level) 

    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 

Trả lời

6

Việc huỷ bỏ không phải là ngay lập tức và đòi hỏi phải chạy ioloop để được giải quyết với ngoại lệ CancelledError. Xóa ioloop.stop khỏi tắt máy và xử lý ngoại lệ trong người giám sát, để mọi thứ hoạt động. Dưới đây là ví dụ đơn giản.

Quan trọng là, tuy nhiên bạn có thể hủy Task, nó chỉ dừng xem/chờ kết thúc/kết quả và vòng lặp sẽ không xử lý thêm sự kiện cho nó. Nhưng yêu cầu/đường ống bên dưới sẽ không bị dừng lại.

Giản dụ:

import asyncio 
import functools 
import logging 
import signal 
import sys 
from concurrent.futures import CancelledError 


def shutdown(loop): 
    logging.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     task.cancel() 
    logging.info('bye, exiting in a minute...')  


@asyncio.coroutine 
def get(i): 
    logging.info('sleep for %d', i) 
    yield from asyncio.sleep(i)  


@asyncio.coroutine 
def pull_stats(): 
    coroutines = [get(i) for i in range(10,20)] 
    status = yield from asyncio.gather(*coroutines) 


def supervisor(loop): 
    try: 
     while True: 
      result = loop.run_until_complete(pull_stats()) 
    except CancelledError: 
     logging.info('CancelledError') 
    loop.close() 
    sys.exit(1) 


def main(): 
    logging.getLogger().setLevel(logging.INFO) 
    loop = asyncio.get_event_loop() 
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 
    supervisor(loop) 


if __name__ == '__main__': 
    main() 

Lưu ý, nếu bạn hủy chỉ gather's tương lai, tất cả trẻ em sẽ được thiết lập như là hủy là tốt.

Và giấc ngủ điều

Bất kỳ nhận được tín hiệu hoặc ngắt làm cho chương trình để tiếp tục thực hiện. Vì vậy, khi quá trình nhận được SIGTERM và trình xử lý được thiết lập, python cho phép bạn xử lý nó, để làm chủ đề này được nối lại và gọi là sighandler. Do thực hiện ioloop và xử lý tín hiệu của nó, nó tiếp tục chạy sau khi thức dậy.

+0

tôi sửa đổi mã như bạn đề nghị và nó bắt ngoại lệ nhưng tôi vẫn thấy rằng pull_stats() để được đánh thức dậy khi tôi gửi tín hiệu TERM. Trong ví dụ mã của bạn, tôi không thấy điều đó xảy ra. Tôi không hoàn toàn hiểu rõ câu nói của bạn về giấc ngủ. Bạn có đề xuất rằng giấc ngủ ngăn chặn các thread được dừng lại? Hơn nữa, làm thế nào để tuyên truyền hủy bỏ trong tất cả các coroutines vì ​​vậy tôi có thể làm sạch các bước ở đó? Cảm ơn rất nhiều @ kwarunek cho câu trả lời của bạn và thời gian của bạn để cung cấp một ví dụ mã, rất nhiều đánh giá cao –

+1

Tôi đã chỉnh sửa một phần bit về SIGTERM, nó cũng không được bao gồm trong ví dụ. – kwarunek

+0

@ kwarunke, giờ đây có ý nghĩa. Vì vậy, khi hủy bỏ được gửi đi, một nhiệm vụ được nối lại ở dòng sản lượng cuối cùng nơi mà coroutine hiện đang bị đình chỉ. Trong trường hợp của tôi, tôi đang ở trong dòng ngủ, tín hiệu đi vào, chủ đề chính thức dậy từ giấc ngủ, Trong khi True khởi động tất cả các tương lai mà sau đó nhận được hủy bỏ nhưng coroutines pull_stats và được thức dậy nhưng không tiến hành khi chúng bị hủy bỏ. Tôi vẫn đang cố gắng tìm cách hủy bỏ khi chương trình đang trong giai đoạn kết nối/nhận/ghi, vì tôi muốn thực hiện một số việc làm sạch. Một lần nữa cảm ơn rất nhiều sự hỗ trợ của bạn. –

0

Cập nhật: Mã hoạt động như mong đợi trên trăn 3.4.4, xem nhận xét của tôi bên dưới. @kwarunek, khi bạn đề cập đến bình luận cuối cùng của bạn về ioloop tiếp tục chạy tôi đã không hoàn toàn nhận được nó như mã của tôi làm việc, giết chết quá trình gửi một hủy bỏ cho tất cả các nhiệm vụ được thức dậy. Nhưng, bây giờ tôi thấy điểm của bạn bởi vì hủy bỏ các nhiệm vụ không được kích hoạt với 3.4.4, với 3.4.2 là tốt.

21:28:09,004 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:28:11,826 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:11,827 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:11,828 [59441] [MainThread:shutdown] INFO  received stop signal 
21:28:11,828 [59441] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /opt/blue-python/3.4/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:28:11,829 [59441] [MainThread:shutdown] INFO  cancelling task 
21:28:11,829 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
21:28:11,829 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
21:28:21,009 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:21,010 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:21,011 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
2016-01-30 21:28:21,011 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 

trong khi ở python 3.4.2

21:23:51,015 [10219] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:23:55,737 [10219] [MainThread:supervisor] INFO  starting while loop 
21:23:55,737 [10219] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:23:55,740 [10219] [MainThread:shutdown] INFO  received stop signal 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,740 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(7)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(4)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(5)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(6)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<pull_stats() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:150> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[_raise_stop_error() at /usr/lib/python3.4/asyncio/base_events.py:101]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(2)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,753 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,754 [10219] [MainThread:supervisor] INFO  Received CancelledError exception 
21:23:55,754 [10219] [MainThread:supervisor] INFO  waiting for threads to finish any pending IO tasks 
21:23:55,754 [10219] [MainThread:supervisor] INFO  closing our asyncio loop 
21:23:55,755 [10219] [MainThread:supervisor] INFO  exiting with status 0 

Sự khác biệt chính là khi shutdown() gửi hủy không có nhiệm vụ đánh thức dậy và kết quả là vòng lặp while không ngừng bởi hãy thử bắt khối xử lý việc hủy bỏ. Làm cách nào để giải quyết vấn đề này ngay bây giờ ?!

đây là mã

def shutdown(): 
    """Performs a clean shutdown""" 
    log.info('received stop signal') 
    for task in asyncio.Task.all_tasks(): 
     log.info(task) 
     log.info('cancelling task') 
     task.cancel() 


def write_file(filename, data): 
    """Writes data to a file. 

    Returns: 
     True if succeeds False otherwise. 
    """ 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     log.critical('failed to write data %s', exc) 
     return False 
    else: 
     log.debug('data saved in %s', filename) 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop, executor, timeout): 
    """Fetches data from a UNIX socket. 

    Sends a command to HAProxy over UNIX socket, reads the response and then 
    offloads the writing of the received data to a thread, so we don't block 
    this coroutine. 

    Arguments: 
     socket_file (str): The full path of the UNIX socket file to connect to. 
     cmd (str): The command to send. 
     storage_dir (str): The full path of the directory to save the response. 
     loop (obj): A base event loop from asyncio module. 
     executor (obj): A Threader executor to execute calls asynchronously. 
     timeout (int): Timeout for the connection to the socket. 

    Returns: 
     True if statistics from a UNIX sockets are save False otherwise. 
    """ 
    # try to connect to the UNIX socket 
    connect = asyncio.open_unix_connection(socket_file) 
    log.debug('connecting to UNIX socket %s', socket_file) 
    try: 
     reader, writer = yield from asyncio.wait_for(connect, timeout) 
    except (ConnectionRefusedError, PermissionError, OSError) as exc: 
     log.critical(exc) 
     return False 
    else: 
     log.debug('connection established to UNIX socket %s', socket_file) 

    log.debug('sending command "%s" to UNIX socket %s', cmd, socket_file) 
    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    if len(data) == 0: 
     log.critical('received zero data') 
     return False 

    log.debug('received data from UNIX socket %s', socket_file) 

    suffix = CMD_SUFFIX_MAP.get(cmd.split()[1]) 
    filename = os.path.basename(socket_file) + suffix 
    filename = os.path.join(storage_dir, filename) 
    log.debug('going to save data to %s', filename) 
    # Offload the writing to a thread so we don't block ourselves. 
    result = yield from loop.run_in_executor(executor, 
              write_file, 
              filename, 
              data) 

    return result 


@asyncio.coroutine 
def pull_stats(config, storage_dir, loop, executor): 
    """Launches coroutines for pulling statistics from UNIX sockets. 

    This a delegating routine. 

    Arguments: 
     config (obj): A configParser object which holds configuration. 
     storage_dir (str): The absolute directory path to save the statistics. 
     loop (obj): A base event loop. 
     executor(obj): A ThreadPoolExecutor object. 

    Returns: 
     True if statistics from *all* UNIX sockets are fetched False otherwise. 
    """ 
    # absolute directory path which contains UNIX socket files. 
    socket_dir = config.get('pull', 'socket-dir') 
    timeout = config.getint('pull', 'timeout') 
    socket_files = [f for f in glob.glob(socket_dir + '/*') 
        if is_unix_socket(f)] 

    log.debug('pull statistics') 
    coroutines = [get(socket_file, cmd, storage_dir, loop, executor, timeout) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    # Launch all connections. 
    status = yield from asyncio.gather(*coroutines) 

    return len(set(status)) == 1 and True in set(status) 


def supervisor(loop, config): 
    """Coordinates the pulling of HAProxy statistics from UNIX sockets. 

    This is the client routine which launches requests to all HAProxy 
    UNIX sockets for retrieving statistics and save them to file-system. 
    It runs indefinitely until main program is terminated. 

    Arguments: 
     loop (obj): A base event loop from asyncio module. 
     config (obj): A configParser object which holds configuration. 
    """ 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 
    executor = ThreadPoolExecutor(max_workers=config.getint('pull', 'workers')) 
    exit_code = 1 

    while True: 
     log.info('starting while loop') 
     start_time = int(time.time()) 
     # HAProxy statistics are stored in a directory and we use retrieval 
     # time(seconds since the Epoch) as a name of the directory. 
     # We first store them in a temporary place until we receive statistics 
     # from all UNIX sockets. 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     # If our storage directory can't be created we can't do much, thus 
     # abort main program. 
     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to make directory {d}:{e}".format(d=storage_dir, 
                  e=exc) 
      log.critical(msg) 
      log.critical('a fatal error has occurred, exiting..') 
      break 

     try: 
      log.info('launch the delegating coroutine') 
      result = loop.run_until_complete(pull_stats(config, storage_dir, 
                 loop, executor)) 
      log.info('delegating coroutine finished') 
     except asyncio.CancelledError: 
      log.info('Received CancelledError exception') 
      exit_code = 0 
      break 

     # if and only if we received statistics from all sockets then move 
     # statistics to the permanent directory. 
     # NOTE: when temporary and permanent storage directory are on the same 
     # file-system the move is actual a rename, which is an atomic 
     # operation. 
     if result: 
      log.debug('move %s to %s', storage_dir, dst_dir) 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       log.critical("failed to move %s to %s: %s", storage_dir, 
          dst_dir, exc) 
       log.critical('a fatal error has occurred, exiting..') 
       break 
      else: 
       log.info('statistics are stored in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      log.critical('failed to pull stats') 
      log.debug('removing temporary directory %s', storage_dir) 
      shutil.rmtree(storage_dir) 

     # calculate sleep time which is interval minus elapsed time. 
     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      log.debug('sleeping for %.3fs secs', sleep) 
      time.sleep(sleep) 

    # It is very unlikely that threads haven't finished their job by now, but 
    # they perform disk IO operations which can take some time in certain 
    # situations, thus we want to wait for them in order to perform a clean 
    # shutdown. 
    log.info('waiting for threads to finish any pending IO tasks') 
    executor.shutdown(wait=True) 
    log.info('closing our asyncio loop') 
    loop.close() 
    log.info('exiting with status %s', exit_code) 
    sys.exit(exit_code) 


def main(): 
    """Parses CLI arguments and launches main program.""" 
    args = docopt(__doc__, version=VERSION) 

    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    # Set defaults for all sections 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    # Load configuration from a file. NOTE: ConfigParser doesn't warn if user 
    # sets a filename which doesn't exist, in this case defaults will be used. 
    config.read(args['--file']) 

    if args['--print']: 
     for section in sorted(DEFAULT_OPTIONS): 
      print("[{}]".format(section)) 
      for key, value in sorted(DEFAULT_OPTIONS[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 
    if args['--print-conf']: 
     for section in sorted(config): 
      print("[{}]".format(section)) 
      for key, value in sorted(config[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 

    log.setLevel(getattr(logging, config.get('pull', 'loglevel').upper(), 
         None)) 
    # Setup our event loop 
    loop = asyncio.get_event_loop() 

    # Register shutdown to signals 
    loop.add_signal_handler(signal.SIGHUP, shutdown) 
    loop.add_signal_handler(signal.SIGTERM, shutdown) 

    # a temporary directory to store fetched data 
    tmp_dst_dir = config['pull']['tmp-dst-dir'] 
    # a permanent directory to move data from the temporary directory. Data are 
    # picked up by the process daemon from that directory. 
    dst_dir = config['pull']['dst-dir'] 
    for directory in dst_dir, tmp_dst_dir: 
     try: 
      os.makedirs(directory) 
     except OSError as exc: 
      # errno 17 => file exists 
      if exc.errno != 17: 
       sys.exit("failed to make directory {d}:{e}".format(d=directory, 
                    e=exc)) 
    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 
+0

Tìm thấy sự cố. trên hệ thống mà tôi đã sử dụng python 3.4.4, pull_stats coroutine không lên lịch lấy các coroutines vì ​​danh sách socket_files rỗng. Điều đó giải thích thông báo [MainThread: shutdown] INFO result = False> Do nhiệm vụ hoàn thành việc hủy bỏ không diễn ra và hậu quả là thử bắt không bao giờ nhận được ngoại lệ để gây ra sự thoát khỏi chương trình. Trên một hộp khác với 3.4.4 trong đó danh sách socket_files là * không * làm trống công việc hủy bỏ –

Các vấn đề liên quan