2011-12-14 30 views
33

Tôi đang sử dụng Celery để quản lý các tác vụ không đồng bộ. Đôi khi, tuy nhiên, quá trình cần tây đi xuống mà nguyên nhân không có nhiệm vụ để thực hiện. Tôi muốn có thể kiểm tra tình trạng cần tây và đảm bảo mọi thứ đều hoạt động tốt và nếu tôi phát hiện bất kỳ sự cố nào hiển thị thông báo lỗi cho người dùng. Từ tài liệu của Celery Worker, có vẻ như tôi có thể sử dụng ping hoặc inspect cho việc này, nhưng ping cảm thấy bị hack và không rõ ràng cách kiểm tra được sử dụng (nếu kiểm tra(). Registered() rỗng?).Phát hiện xem cần tây có sẵn/Chạy

Bất kỳ hướng dẫn nào về điều này sẽ được đánh giá cao. Về cơ bản những gì tôi đang tìm kiếm là một phương pháp như vậy:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

EDIT: Nó thậm chí không giống như đăng ký() có sẵn trên cần tây 2.3.3 (mặc dù tài liệu 2.1 liệt kê nó). Có lẽ ping là câu trả lời đúng.

EDIT: Ping cũng không xuất hiện để làm những gì tôi nghĩ rằng nó sẽ làm, vì vậy vẫn không chắc chắn câu trả lời ở đây.

+0

Có phải câu trả lời dưới đây không làm việc cho bạn? Là một người có vấn đề tương tự để giải quyết, tôi sẽ yêu một số xác nhận. – kojiro

Trả lời

44

Đây là mã tôi đang sử dụng. celery.task.control.Inspect.stats() trả về một dict chứa rất nhiều chi tiết về các công nhân hiện có, Không có nếu không có nhân viên nào đang chạy hoặc tăng IOError nếu nó không thể kết nối với nhà môi giới thư. Tôi đang sử dụng RabbitMQ - có thể các hệ thống nhắn tin khác có thể hoạt động hơi khác. Điều này làm việc trong Celery 2.3.x và 2.4.x; Tôi không chắc nó quay lại bao xa.

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

Làm việc cho tôi :) – kojiro

+6

Tôi đã phát hiện ra rằng ở trên thêm hai hàng trả lời.celery.pidbox vào thỏmq mỗi lần nó chạy. Điều này dẫn đến sự gia tăng sử dụng bộ nhớ của thỏmq. – kojiro

2

Sau đây làm việc cho tôi:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 
+2

Tôi khá chắc chắn điều này sẽ thành công nếu thỏmq đang chạy bất kể tình trạng cần tây. Nhưng đây là một kiểm tra tốt để làm nếu cần tây không biết liệu sự thất bại là với thỏmq hay cái gì khác. –

4

Để kiểm tra bằng cách sử dụng dòng lệnh tương tự trong trường hợp cần tây đang chạy như daemon,

  • Kích hoạt virtualenv và đi đến dir nơi 'ứng dụng' là
  • Bây giờ chạy: celery -A [app_name] status
  • Nó sẽ hiển thị nếu cần tây có hay không cộng với không. của các nút trực tuyến

Nguồn: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

Các vấn đề liên quan