2013-07-09 26 views
5

Bộ điều hợp lốc xoáy hỗ trợ thư viện Pika, here là một ví dụ về cách xuất bản thư bằng bộ điều hợp Không đồng bộ.Cách liên lạc RabbitMQ (thư viện Pika) trong ứng dụng lốc xoáy

Tôi muốn sử dụng ứng dụng pika trong lốc xoáy, chỉ là một ví dụ, tôi muốn đưa dữ liệu yêu cầu lốc xoáy vào RabbitMQ, Nhưng không biết cách thực hiện.

Hai câu hỏi không biết cách giải quyết.

1 Pika sử dụng bộ chuyển đổi cơn lốc xoáy có ioloop riêng của mình,

self._connection = pika.SelectConnection(pika.URLParameters(self._url), 
             self.on_connection_open) 
self._connection.ioloop.start() 

ứng dụng Tornado có ioloop riêng của mình,

tornado.ioloop.IOLoop.instance().start() 

Làm thế nào để kết hợp hai ioloop?

2 Ví dụ Pika xuất bản cùng một tin nhắn lặp đi lặp lại, nhưng tôi muốn xuất bản dữ liệu yêu cầu, cách chuyển dữ liệu yêu cầu sang phương pháp xuất bản?

Trả lời

6

Khi tìm kiếm chính xác điều tương tự, tôi đã tìm thấy số này blog post of Kevin Jing Qiu.

Tôi đã đi lỗ thỏm hơn một chút để cung cấp cho mỗi websocket bộ kênh và hàng đợi của riêng mình.

Trích xuất từ ​​dự án của tôi có thể được tìm thấy bên dưới. Ứng dụng lốc xoáy liên kết với RabbitMQ bao gồm các phần sau:

  1. Ứng dụng Tornado sẽ xử lý yêu cầu web. Tôi chỉ thấy các websockets sống lâu ở đây, nhưng bạn có thể làm như vậy với các yêu cầu http ngắn ngủi.
  2. A (một) kết nối RabbitMQ do PikaClient Instance
  3. kết nối web xác định kênh, hàng đợi và trao đổi khi phương thức mở được kích hoạt.

Giờ đây, kết nối websocket có thể nhận dữ liệu từ lốc xoáy (dữ liệu từ trình duyệt) qua on_message và gửi tới RabbitMQ.

Kết nối websocket sẽ nhận dữ liệu từ RabbitMQ qua basic_consume.

Tính năng này không đầy đủ chức năng, nhưng bạn nên có ý tưởng.

class PikaClient(object): 

    def __init__(self, io_loop): 
     logger.info('PikaClient: __init__') 
     self.io_loop = io_loop 

     self.connected = False 
     self.connecting = False 
     self.connection = None 
     self.channel = None 
     self.message_count = 0 
    """ 
    Pika-Tornado connection setup 
    The setup process is a series of callback methods. 
    connect:connect to rabbitmq and build connection to tornado io loop -> 
    on_connected: create a channel to rabbitmq -> 
    on_channel_open: declare queue tornado, bind that queue to exchange 
        chatserver_out and start consuming messages. 
    """ 

    def connect(self): 
     if self.connecting: 
      #logger.info('PikaClient: Already connecting to RabbitMQ') 
      return 

     #logger.info('PikaClient: Connecting to RabbitMQ') 
     self.connecting = True 

     cred = pika.PlainCredentials('guest', 'guest') 
     param = pika.ConnectionParameters(
      host='localhost', 
      port=5672, 
      virtual_host='/', 
      credentials=cred 
     ) 
     self.connection = TornadoConnection(param, 
      on_open_callback=self.on_connected,stop_ioloop_on_close=False) 
     self.connection.add_on_close_callback(self.on_closed) 

    def on_connected(self, connection): 
     logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     # now you are able to call the pika api to do things 
     # this could be exchange setup for websocket connections to 
     # basic_publish to later. 
     self.connection.channel(self.on_channel_open) 

    def on_channel_open(self, channel): 
     logger.info('PikaClient: Channel %s open, Declaring exchange' % channel) 
     self.channel = channel 

    def on_closed(self, connection): 
     logger.info('PikaClient: rabbit connection closed') 
     self.io_loop.stop() 


class MyWebSocketHandler(websocket.WebSocketHandler): 
    def __init__(self): 
     self.status = 'not connected yet' 

    def open(self, *args, **kwargs): 
     self.status = "ws open" 
     self.rabbit_connect() # connect this websocket object to rabbitmq 

    def rabbit_connect(): 
     self.application.pc.connection.channel(self.rabbit_channel_in_ok) 

    def rabbit_channel_in_ok(self,channel): 
     self.channel_in = channel 
     self.channel_in.queue_declare(self.rabbit_declare_ok, 
             exclusive=True,auto_delete=True) 


# and so on... 


handlers = [ your_definitions_here_like_websockets_or_such ] 
settings = { your_settings_here } 
application = tornado.web.Application(handlers,**settings) 

def main(): 
    io_loop = tornado.ioloop.IOLoop.instance() 
    # PikaClient is our rabbitmq consumer 
    pc = PikaClient(io_loop) 
    application.pc = pc 
    application.pc.connect() 
    application.listen(config.tornadoport) 
    try: 
     io_loop.start() 
    except KeyboardInterrupt: 
     io_loop.stop() 

if __name__ == '__main__': 
    main() 
+0

Liên kết được cung cấp đã cũ. – FactualHarmony

+0

cảm ơn. và cố định. – itsafire

Các vấn đề liên quan