2011-10-27 24 views
15

Làm thế nào để thanh lọc tất cả các nhiệm vụ được lập lịch và chạy của một hàng cụ thể với cần tây trong python? Những câu hỏi dường như straigtforward khá, nhưng để thêm tôi không tìm kiếm các mã dòng lệnhLàm thế nào để thanh lọc tất cả các nhiệm vụ của một hàng đợi cụ thể với cần tây trong python?

Tôi có dòng sau, trong đó xác định các que và muốn tẩy mà que để quản lý các nhiệm vụ:

CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}} 

Tại 1 điểm trong thời gian tôi muốn thanh lọc tất cả các nhiệm vụ trong twitter_save que với mã python, có thể với một chức năng phát sóng? Tôi không thể tìm thấy tài liệu về điều này. Điều này có thể không?

Trả lời

33

chỉ để cập nhật @Sam Stoelinga câu trả lời cho cần tây 3.1, bây giờ nó có thể được thực hiện như thế này trên một thiết bị đầu cuối:

celery amqp queue.purge <QUEUE_NAME> 

Đối với Django hãy chắc chắn để bắt đầu nó từ tập tin manage.py:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

Nếu không, đảm bảo cần tây có thể trỏ chính xác đến người môi giới bằng cách đặt cờ --broker=.

+0

Cảm ơn thao tác này cho Celery 3.1 – Gourneau

+0

Tôi cần sử dụng đối số '--broker = ...' để trỏ tới URL AMQP hợp lệ, vì lý do nào đó giá trị được định cấu hình trong cài đặt Django.py không được chọn . Có lẽ một sự khác biệt trong thiết lập của tôi. – RichVel

+0

@RichVel là bạn đang chạy nó từ './Manage.py'? đối số url môi giới phải được lấy từ tệp 'settings.py'. – Hassek

6

Rất dễ dàng, hy vọng ai đó có thể giúp tôi.

from celery.bin.camqadm import camqadm 
camqadm('queue.purge', queue_name_as_string) 

Vấn đề duy nhất với điều này tôi vẫn cần phải ngăn chặn celeryd trước khi tẩy các que, sau khi tẩy tôi cần phải chạy celeryd một lần nữa để xử lý công việc cho hàng đợi. Sẽ cập nhật câu hỏi này nếu tôi thành công.

Tôi đã thành công, nhưng hãy sửa tôi nếu đây không phải là một phương pháp tốt để ngăn chặn cần tây, thanh trừng que và bắt đầu lại. Tôi biết tôi đang sử dụng thuật ngữ, bởi vì tôi thực sự muốn nó được chấm dứt nhiệm vụ.

kill_command = "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9" 
subprocess.call(kill_command, shell=True) 

camqadm('queue.purge', 'twitter_save') 
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT 

os.popen(rerun_command+' &') 
send_task("socialreport.tasks.twitter_save") 
6

Câu trả lời gốc không hoạt động cho Celery 3.1. Bản cập nhật của Hassek là lệnh đúng nếu bạn muốn thực hiện nó từ dòng lệnh. Nhưng nếu bạn muốn làm điều đó lập trình, làm điều này:

Giả sử bạn chạy ứng dụng cần tây của bạn như:

celery_app = Celery(...) 

Sau đó:

import celery.bin.amqp 
amqp = celery.bin.amqp.amqp(app = celery_app) 
amqp.run('queue.purge', 'name_of_your_queue') 

Đây là tiện dụng cho trường hợp bạn' đã enqueued một loạt các nhiệm vụ, và một nhiệm vụ gặp phải một điều kiện chết người mà bạn biết sẽ ngăn chặn phần còn lại của các nhiệm vụ từ thực hiện.

Ví dụ: bạn enqueued một loạt các nhiệm vụ trình thu thập web, và ở giữa nhiệm vụ của bạn địa chỉ IP của máy chủ của bạn bị chặn. Không có vấn đề gì trong việc thực hiện các nhiệm vụ còn lại. Vì vậy, trong trường hợp đó, nhiệm vụ của bạn tự nó có thể thanh lọc hàng đợi của chính nó.

Các vấn đề liên quan