2012-03-20 31 views
5

Tôi hơi bối rối về cấu hình của tôi sẽ như thế nào để thiết lập trao đổi chủ đề.Trao đổi chủ đề với Cần tây và ThỏMQ

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

Đây là những gì tôi muốn thực hiện:

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

thì:

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

Tôi chỉ muốn task1 để tiêu thụ từ QueueOne và Task2 để tiêu thụ từ QueueTwo.

Vấn đề đó bây giờ là khi Task1 và 2 chạy, chúng cũng thoát khỏi QueueFirehose và nhiệm vụ TaskFirehose không bao giờ thực thi.

Có vấn đề gì với cấu hình của tôi hay tôi hiểu nhầm điều gì đó?

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

Có thể đây chỉ là thuật ngữ để làm rõ, nhưng mô tả của bạn có vẻ như bạn đang làm nhiệm vụ và công nhân. Ví dụ, bạn nói "Task2 được gửi đến Queue2" rồi sau đó nói "Task2 để tiêu thụ từ Queue2". Nhiệm vụ không tiêu thụ; chúng được tiêu thụ (bởi công nhân). Bạn cũng nói rằng "TaskFirehose tác vụ không bao giờ thực hiện" nhưng trong mô tả của bạn, không có TaskFirehose được gửi đến bất kỳ hàng đợi nào. Khái niệm cơ bản là: nhiệm vụ được gửi đến hàng đợi; và công nhân thực hiện các nhiệm vụ từ hàng đợi mà họ được chỉ định. Công việc! = Công nhân thực hiện chúng. –

Trả lời

0

Giả sử rằng bạn thực sự có nghĩa là một cái gì đó như thế này:

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

thì:

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

này có thể không được chính xác những gì bạn có nghĩa là, nhưng tôi nghĩ rằng nó nên bao gồm nhiều kịch bản và hy vọng bạn cũng vậy Something như thế này nên làm việc:

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

Để biết thêm lựa chọn và tham khảo: http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

Đây là trực tiếp từ tài liệu.

Tôi cũng có tình huống tương tự và tôi giải quyết nó theo một cách hơi khác. Tôi không thể sử dụng cần tây đa với người giám sát. Vì vậy, thay vào đó tôi đã tạo nhiều chương trình trong giám sát cho từng công nhân. Các công nhân sẽ được trên các quy trình khác nhau anyway, vì vậy chỉ cần cho phép giám sát chăm sóc của tất cả mọi thứ cho bạn. Các tập tin cấu hình trông giống như sau: -

; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

Tương tự như vậy, đối với Worker2 và WorkerFirehose, chỉnh sửa các dòng tương ứng để thực hiện:

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

bao gồm tất cả chúng trong supervisord .conf tập tin và nên làm điều đó.

Các vấn đề liên quan