2012-10-23 20 views
9

Tôi muốn kiểm tra xem Người tiêu dùng/Công nhân có mặt để tiêu thụ một Thông báo Tôi sắp gửi hay không.Trong Pika hoặc RabbitMQ, Làm cách nào để kiểm tra xem có người tiêu dùng nào hiện đang tiêu thụ không?

Nếu không có bất kỳ Worker, tôi sẽ bắt đầu một số công nhân (cả người tiêu dùng và các nhà xuất bản đang ở trên một máy duy nhất) và sau đó đi về xuất bản Tin nhắn.

Nếu có một chức năng như connection.check_if_has_consumers, tôi sẽ thực hiện nó một chút như thế này -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

Nhưng tôi không thể tìm thấy bất kỳ chức năng với check_if_has_consumers chức năng trong pika.

Có cách nào để hoàn thành việc này không, sử dụng pika? hoặc có thể, bằng cách nói đến Con thỏ trực tiếp?

Tôi không hoàn toàn chắc chắn, nhưng tôi thực sự nghĩ rằng RabbitMQ sẽ nhận thức được số lượng người tiêu dùng đăng ký hàng đợi khác nhau, vì nó không cử điệp với họ và chấp nhận ack

Tôi chỉ đã bắt đầu với RabbitMQ 3 giờ trước ... bất cứ sự giúp đỡ được chào đón ...

đây là mã workers.py tôi đã viết, nếu có sự giúp đỡ của nó ....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

EDIT:

tôi phải di chuyển về phía trước vì vậy đây là một cách giải quyết mà tôi sẽ mất, trừ khi một cách tiếp cận tốt hơn đến cùng,

Vì vậy, RabbitMQ đã những HTTP management apis, chúng hoạt động sau khi bạn đã bật management plugin và ở giữa trang HTTP apis có

/api/connections - Danh sách tất cả các kết nối đang mở.

/api/connections/name - Kết nối riêng lẻ. XÓA nó sẽ đóng kết nối.

Vì vậy, nếu tôi kết nối tôi Workerstôi Tạo cả bởi khác nhau kết nối tên/người dùng, tôi sẽ có thể kiểm tra xem Worker kết nối mở cửa ... (có có thể là vấn đề khi công nhân chết ...)

sẽ chờ giải pháp tốt hơn ...

EDIT:

chỉ thấy điều này trong các tài liệu RabbitMQ, nhưng điều này sẽ hacky để làm trong python:

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

vì vậy tôi có thể làm một cái gì đó như thế nào,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

hacky ... vẫn hy vọng pika có một số chức năng trăn để làm điều này ...

Cảm ơn,

Trả lời

7

Tôi cũng đang xem xét điều này. Sau khi đọc qua nguồn và tài liệu, tôi đã xem xét những điều sau đây trong channel.py:

@property 
def consumer_tags(self): 
    """Property method that returns a list of currently active consumers 

    :rtype: list 

    """ 
    return self._consumers.keys() 

Thử nghiệm của riêng tôi đã thành công. Tôi đã sử dụng các mục sau đây nơi đối tượng kênh của tôi là self._channel:

if len(self._channel.consumer_tags) == 0: 
     LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") 
     ... 
0

Tôi thực sự đã tìm thấy sự cố này khi tìm sự cố khác, nhưng một điều có thể giúp bạn ở chức năng Basic_Publish, có thông số "Ngay lập tức" được đặt mặc định thành Sai.

Một ý tưởng bạn có thể làm là đặt Cờ ngay lập tức thành True, sẽ yêu cầu nó ngay lập tức được người tiêu dùng tiêu thụ thay vì ngồi trong hàng đợi. Nếu một nhân viên không có sẵn để tiêu thụ tin nhắn, nó sẽ trả lại một lỗi, yêu cầu bạn bắt đầu một nhân viên khác.

Tùy thuộc vào thông lượng của hệ thống của bạn, điều này sẽ hoặc là sinh sản nhiều lao động thêm, hoặc sinh sản công nhân để thay thế công nhân đã chết. Đối với vấn đề cũ, bạn có thể viết một hệ thống giống như quản trị viên, chỉ cần theo dõi công nhân thông qua hàng đợi kiểm soát, nơi bạn có thể nói "Runner" giống như quy trình giết các quy trình của công nhân hiện không còn cần thiết nữa.

Các vấn đề liên quan