2014-09-05 21 views
11

Điều này sẽ rất đơn giản và tôi rất ngạc nhiên khi tôi không thể tìm thấy câu hỏi này đã được trả lời trên stackoverflow.Xử lý tín hiệu trong Python đa luồng

Tôi có một daemon giống như chương trình cần đáp ứng với tín hiệu SIGTERM và SIGINT để hoạt động tốt với mới nổi. Tôi đọc rằng cách tốt nhất để làm điều này là để chạy vòng lặp chính của chương trình trong một chủ đề riêng biệt từ các chủ đề chính và để cho các chủ đề chính xử lý các tín hiệu. Sau đó, khi một tín hiệu được nhận, bộ xử lý tín hiệu phải báo cho vòng lặp chính thoát ra bằng cách thiết lập một lá cờ sentinel thường xuyên được kiểm tra trong vòng lặp chính.

Tôi đã thử làm điều này nhưng nó không hoạt động theo cách tôi mong đợi. Xem mã bên dưới:

from threading import Thread 
import signal 
import time 
import sys 

stop_requested = False  

def sig_handler(signum, frame): 
    sys.stdout.write("handling signal: %s\n" % signum) 
    sys.stdout.flush() 

    global stop_requested 
    stop_requested = True  

def run(): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not stop_requested: 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

signal.signal(signal.SIGTERM, sig_handler) 
signal.signal(signal.SIGINT, sig_handler) 

t = Thread(target=run) 
t.start() 
t.join() 
sys.stdout.write("join completed\n") 
sys.stdout.flush() 

Tôi thử nghiệm này trong hai cách sau:

1)

$ python main.py > output.txt& 
[2] 3204 
$ kill -15 3204 

2)

$ python main.py 
ctrl+c 

Trong cả hai trường hợp tôi mong đợi viết này đến đầu ra:

run started 
handling signal: 15 
run exited 
join completed 

Trong trường hợp đầu tiên thoát khỏi chương trình nhưng tất cả tôi thấy là:

run started 

Trong trường hợp thứ hai tín hiệu SIGTERM được dường như bỏ qua khi ctrl + c được nhấn và chương trình không thoát.

Tôi thiếu gì ở đây?

+3

Thử thay thế 't.join()' bằng 'while t.is_alive(): t.join (1)'. Chủ đề chính của bạn sẽ thức dậy mỗi giây để kiểm tra tín hiệu. – roippi

+2

Đọc thêm: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html – roippi

Trả lời

18

Vấn đề là, như đã giải thích ở Execution of Python signal handlers:

xử lý tín hiệu A Python không được thực thi bên trong xử lý tín hiệu ở mức độ thấp (C). Thay vào đó, bộ xử lý tín hiệu ở mức độ thấp đặt một lá cờ mà nói với máy ảo để thực hiện xử lý tín hiệu Python tương ứng tại một điểm sau đó (ví dụ tại các hướng dẫn bytecode tiếp theo)

...

Một tính toán lâu dài được thực hiện hoàn toàn trong C (như biểu thức chính quy khớp trên một phần lớn văn bản) có thể chạy liên tục trong một khoảng thời gian tùy ý, bất kể tín hiệu nào nhận được. Các trình xử lý tín hiệu Python sẽ được gọi khi phép tính kết thúc.

Chủ đề chính của bạn bị chặn trên threading.Thread.join, điều này cuối cùng có nghĩa là nó bị chặn trong C khi có cuộc gọi pthread_join. Tất nhiên đó không phải là một "tính toán dài hạn", nó là một khối trên một syscall ... nhưng tuy nhiên, cho đến khi cuộc gọi kết thúc, xử lý tín hiệu của bạn không thể chạy.

Và, trong khi trên một số nền tảng, pthread_join sẽ không thành công với EINTR trên tín hiệu, trên những thiết bị khác thì không. Trên Linux, tôi tin rằng nó phụ thuộc vào việc bạn chọn kiểu BSD hay mặc định là hành vi siginterrupt, nhưng mặc định là không.


Vì vậy, bạn có thể làm gì?

Vâng, tôi chắc chắn rằng changes to signal handling in Python 3.3 thực sự đã thay đổi hành vi mặc định trên Linux, do đó bạn sẽ không cần phải làm bất cứ điều gì nếu bạn nâng cấp; chỉ chạy dưới 3,3+ và mã của bạn sẽ hoạt động như bạn mong đợi. Ít nhất nó cho tôi với CPython 3.4 trên OS X và 3.3 trên Linux. (Nếu tôi sai về điều này, tôi không chắc chắn đó là một lỗi trong CPython hay không, vì vậy bạn có thể muốn nâng cao nó trên python-list hơn là mở một vấn đề ...)

Mặt khác, trước 3.3, mô-đun signal chắc chắn không hiển thị các công cụ bạn cần để tự khắc phục vấn đề này. Vì vậy, nếu bạn không thể nâng cấp lên 3,3, giải pháp là chờ một thứ gì đó có thể bị gián đoạn, như là Condition hoặc Event. Chuỗi con thông báo sự kiện ngay trước khi nó thoát, và luồng chính đợi trên sự kiện trước khi nó nối chuỗi con. Điều này chắc chắn là hacky. Và tôi không thể tìm thấy bất cứ điều gì đảm bảo nó sẽ tạo ra sự khác biệt; nó chỉ xảy ra để làm việc cho tôi trong các bản xây dựng khác nhau của CPython 2.7 và 3.2 trên OS X và 2.6 và 2.7 trên Linux…

+0

"Điều này chắc chắn là hacky" - Tôi sẽ không nói điều đó nói chung. đồng bộ hóa các luồng của bạn ở mức trừu tượng cao hơn so với việc sử dụng 'join' là hợp lý. Nếu mục tiêu của bạn là chờ cho thread thoát ra (như ví dụ cụ thể này) thì 'join' là công cụ thích hợp; nếu bạn muốn đợi cho khối lượng công việc hoàn thành, thì một 'Điều kiện' vv sẽ có ý nghĩa hơn. Khối lượng công việc có thể được thực hiện (ví dụ) trong một chuỗi gộp mà không thoát ngay lập tức, sau khi tất cả. –

8

Câu trả lời của abarnert đã được phát hiện. Tôi vẫn đang sử dụng Python 2.7 tuy nhiên. Để giải quyết vấn đề này cho bản thân mình, tôi đã viết một lớp InterruptableThread.

Hiện tại, nó không cho phép chuyển các đối số bổ sung cho mục tiêu chuỗi. Tham gia không chấp nhận tham số hết thời gian chờ. Điều này chỉ vì tôi không cần phải làm điều đó. Bạn có thể thêm nó nếu bạn muốn. Bạn có thể sẽ muốn loại bỏ các báo cáo đầu ra nếu bạn sử dụng chính nó. Họ chỉ là một cách để bình luận và thử nghiệm.

import threading 
import signal 
import sys 

class InvalidOperationException(Exception): 
    pass  

# noinspection PyClassHasNoInit 
class GlobalInterruptableThreadHandler: 
    threads = [] 
    initialized = False 

    @staticmethod 
    def initialize(): 
     signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler) 
     signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler) 
     GlobalInterruptableThreadHandler.initialized = True 

    @staticmethod 
    def add_thread(thread): 
     if threading.current_thread().name != 'MainThread': 
      raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.") 

     if not GlobalInterruptableThreadHandler.initialized: 
      GlobalInterruptableThreadHandler.initialize() 

     GlobalInterruptableThreadHandler.threads.append(thread) 

    @staticmethod 
    def sig_handler(signum, frame): 
     sys.stdout.write("handling signal: %s\n" % signum) 
     sys.stdout.flush() 

     for thread in GlobalInterruptableThreadHandler.threads: 
      thread.stop() 

     GlobalInterruptableThreadHandler.threads = []  

class InterruptableThread: 
    def __init__(self, target=None): 
     self.stop_requested = threading.Event() 
     self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run) 

    def run(self): 
     pass 

    def start(self): 
     GlobalInterruptableThreadHandler.add_thread(self) 
     self.t.start() 

    def stop(self): 
     self.stop_requested.set() 

    def is_stop_requested(self): 
     return self.stop_requested.is_set() 

    def join(self): 
     try: 
      while self.t.is_alive(): 
       self.t.join(timeout=1) 
     except (KeyboardInterrupt, SystemExit): 
      self.stop_requested.set() 
      self.t.join() 

     sys.stdout.write("join completed\n") 
     sys.stdout.flush() 

Lớp học có thể được sử dụng theo hai cách khác nhau. Bạn có thể chia nhỏ lớp học gián đoạnThread:

import time 
import sys 
from interruptable_thread import InterruptableThread 

class Foo(InterruptableThread): 
    def __init__(self): 
     InterruptableThread.__init__(self) 

    def run(self): 
     sys.stdout.write("run started\n") 
     sys.stdout.flush() 
     while not self.is_stop_requested(): 
      time.sleep(2) 

     sys.stdout.write("run exited\n") 
     sys.stdout.flush() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

foo = Foo() 
foo2 = Foo() 
foo.start() 
foo2.start() 
foo.join() 
foo2.join() 

Hoặc bạn có thể sử dụng nó giống như cách thức hoạt động. Phương thức chạy phải lấy đối tượng InterruptableThread làm tham số.

import time 
import sys 
from interruptable_thread import InterruptableThread 

def run(t): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not t.is_stop_requested(): 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

t1 = InterruptableThread(run) 
t2 = InterruptableThread(run) 
t1.start() 
t2.start() 
t1.join() 
t2.join() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

Làm theo những gì bạn muốn.

1

Tôi gặp phải vấn đề tương tự ở đây signal not handled when multiple threads join. Sau khi đọc câu trả lời của abarnert, tôi đã thay đổi thành Python 3 và giải quyết vấn đề. Nhưng tôi muốn thay đổi tất cả các chương trình của tôi để python 3. Vì vậy, tôi giải quyết chương trình của tôi bằng cách tránh gọi thread join() trước khi tín hiệu được gửi đi. Dưới đây là mã của tôi.

Nó không phải là rất tốt, nhưng giải quyết chương trình của tôi trong python 2,7. Câu hỏi của tôi đã được đánh dấu là trùng lặp, vì vậy tôi đặt giải pháp của tôi ở đây.

import threading, signal, time, os 


RUNNING = True 
threads = [] 

def monitoring(tid, itemId=None, threshold=None): 
    global RUNNING 
    while(RUNNING): 
     print "PID=", os.getpid(), ";id=", tid 
     time.sleep(2) 
    print "Thread stopped:", tid 


def handler(signum, frame): 
    print "Signal is received:" + str(signum) 
    global RUNNING 
    RUNNING=False 
    #global threads 

if __name__ == '__main__': 
    signal.signal(signal.SIGUSR1, handler) 
    signal.signal(signal.SIGUSR2, handler) 
    signal.signal(signal.SIGALRM, handler) 
    signal.signal(signal.SIGINT, handler) 
    signal.signal(signal.SIGQUIT, handler) 

    print "Starting all threads..." 
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60}) 
    thread1.start() 
    threads.append(thread1) 
    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60}) 
    thread2.start() 
    threads.append(thread2) 
    while(RUNNING): 
     print "Main program is sleeping." 
     time.sleep(30) 
    for thread in threads: 
     thread.join() 

    print "All threads stopped."