2012-02-29 25 views
12

Tôi đang sử dụng Pika để xử lý dữ liệu từ RabbitMQ. Khi tôi dường như chạy vào các loại vấn đề khác nhau, tôi quyết định viết một ứng dụng thử nghiệm nhỏ để xem cách tôi có thể xử lý việc ngắt kết nối.RabbitMQ, Pika và chiến lược kết nối lại

tôi đã viết ứng dụng này kiểm tra mà sau:

  1. Connect để môi giới, thử lại cho đến khi thành công
  2. Khi kết nối tạo ra một hàng đợi.
  3. Tiêu thụ hàng đợi này và đặt kết quả vào một Queue Queue.Queue (0)
  4. Lấy mục từ Queue.Queue (0) và đưa nó trở lại hàng đợi nhà môi giới.

Những gì tôi nhận thấy là 2 vấn đề:

  1. Khi tôi chạy kịch bản của tôi từ một máy chủ kết nối với RabbitMQ trên máy chủ khác (bên trong một vm) sau đó kịch bản này hiện hữu trên những khoảnh khắc ngẫu nhiên mà không tạo ra một lỗi.
  2. Khi tôi chạy tập lệnh của mình trên cùng một máy chủ mà RabbitMQ được cài đặt, nó chạy tốt và tiếp tục chạy.

Điều này có thể được giải thích do sự cố mạng, gói bị xóa mặc dù tôi thấy kết nối không thực sự mạnh mẽ.

Khi kịch bản chạy cục bộ trên máy chủ RabbitMQ và tôi giết RabbitMQ sau đó kịch bản thoát với lỗi: "ERROR pika SelectConnection: Socket Error on 3: 104"

Vì vậy, có vẻ như tôi không thể có được chiến lược kết nối lại hoạt động như mong muốn. Ai đó có thể xem mã để xem tôi đang làm gì sai?

Cảm ơn,

Jay

#!/bin/python 
import logging 
import threading 
import Queue 
import pika 
from pika.reconnection_strategies import SimpleReconnectionStrategy 
from pika.adapters import SelectConnection 
import time 
from threading import Lock 

class Broker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.logging = logging.getLogger(__name__) 
     self.to_broker = Queue.Queue(0) 
     self.from_broker = Queue.Queue(0) 
     self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) 
     self.srs = SimpleReconnectionStrategy() 
     self.properties = pika.BasicProperties(delivery_mode=2) 

     self.connection = None 
     while True: 
      try: 
       self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) 
       break 
      except Exception as err: 
       self.logging.warning('Cant connect. Reason: %s' % err) 
       time.sleep(1) 

     self.daemon=True 
    def run(self): 
     while True: 
      self.submitData(self.from_broker.get(block=True)) 
     pass 
    def on_connected(self,connection): 
     connection.channel(self.on_channel_open) 
    def on_channel_open(self,new_channel): 
     self.channel = new_channel 
     self.channel.queue_declare(queue='sandbox', durable=True) 
     self.channel.basic_consume(self.processData, queue='sandbox')  
    def processData(self, ch, method, properties, body): 
     self.logging.info('Received data from broker') 
     self.channel.basic_ack(delivery_tag=method.delivery_tag) 
     self.from_broker.put(body) 
    def submitData(self,data): 
     self.logging.info('Submitting data to broker.') 
     self.channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=self.properties) 
if __name__ == '__main__': 
    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 
    broker=Broker() 
    broker.start() 
    try: 
     broker.connection.ioloop.start() 
    except Exception as err: 
     print err 

Trả lời

17

Vấn đề chính với kịch bản của bạn là nó tương tác với một kênh duy nhất của cả hai chủ đề chính của bạn (nơi ioloop đang chạy) và "môi giới" chuỗi (gọi submitData trong vòng lặp). Đây là not safe.

Ngoài ra, SimpleReconnectionStrategy dường như không làm gì hữu ích. Nó không gây ra kết nối lại nếu kết nối bị gián đoạn. Tôi tin rằng đây là lỗi trong Pika: https://github.com/pika/pika/issues/120

Tôi đã cố gắng cấu trúc lại mã của bạn để làm cho nó hoạt động như tôi nghĩ bạn muốn, nhưng gặp sự cố khác. Pika dường như không có cách nào để phát hiện lỗi chuyển phát, có nghĩa là dữ liệu có thể bị mất nếu kết nối bị ngắt. Điều này có vẻ như một yêu cầu rõ ràng như vậy! Làm thế nào có thể không có cách nào để phát hiện rằng basic_publish không thành công? Tôi đã thử tất cả các loại công cụ bao gồm giao dịch và add_on_return_callback (tất cả đều có vẻ phức tạp và quá phức tạp), nhưng không có gì. Nếu thực sự không có cách nào thì pika chỉ có vẻ hữu ích trong các tình huống có thể chịu đựng mất dữ liệu được gửi tới RabbitMQ, hoặc trong các chương trình chỉ cần tiêu thụ từ RabbitMQ.

Đây không phải là đáng tin cậy, nhưng để tham khảo, đây là một số mã có thể giải quyết vấn đề của bạn multi-thread:

import logging 
import pika 
import Queue 
import sys 
import threading 
import time 
from functools import partial 
from pika.adapters import SelectConnection, BlockingConnection 
from pika.exceptions import AMQPConnectionError 
from pika.reconnection_strategies import SimpleReconnectionStrategy 

log = logging.getLogger(__name__) 

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) 


class Broker(object): 

    def __init__(self, parameters, on_channel_open, name='broker'): 
     self.parameters = parameters 
     self.on_channel_open = on_channel_open 
     self.name = name 

    def connect(self, forever=False): 
     name = self.name 
     while True: 
      try: 
       connection = SelectConnection(
        self.parameters, self.on_connected) 
       log.debug('%s connected', name) 
      except Exception: 
       if not forever: 
        raise 
       log.warning('%s cannot connect', name, exc_info=True) 
       time.sleep(10) 
       continue 

      try: 
       connection.ioloop.start() 
      finally: 
       try: 
        connection.close() 
        connection.ioloop.start() # allow connection to close 
       except Exception: 
        pass 

      if not forever: 
       break 

    def on_connected(self, connection): 
     connection.channel(self.on_channel_open) 


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): 
    def on_queue_declared(frame): 
     # PROBLEM pika does not appear to have a way to detect delivery 
     # failure, which means that data could be lost if the connection 
     # drops... 
     channel.confirm_delivery(on_delivered) 
     submit_data() 

    def on_delivered(frame): 
     if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: 
      log.info('submission confirmed %r', frame) 
      # increasing this value seems to cause a higher failure rate 
      time.sleep(0) 
      submit_data() 
     else: 
      log.warn('submission failed: %r', frame) 
      #data_queue.put(...) 

    def submit_data(): 
     log.info('waiting on data queue') 
     data = data_queue.get() 
     log.info('got data to submit') 
     channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=properties, 
        mandatory=True) 
     log.info('submitted data to broker') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


def blocking_submitter(parameters, data_queue, 
     properties=DEFAULT_PROPERTIES): 
    while True: 
     try: 
      connection = BlockingConnection(parameters) 
      channel = connection.channel() 
      channel.queue_declare(queue='sandbox', durable=True) 
     except Exception: 
      log.error('connection failure', exc_info=True) 
      time.sleep(1) 
      continue 
     while True: 
      log.info('waiting on data queue') 
      try: 
       data = data_queue.get(timeout=1) 
      except Queue.Empty: 
       try: 
        connection.process_data_events() 
       except AMQPConnectionError: 
        break 
       continue 
      log.info('got data to submit') 
      try: 
       channel.basic_publish(exchange='', 
          routing_key='sandbox', 
          body=data, 
          properties=properties, 
          mandatory=True) 
      except Exception: 
       log.error('submission failed', exc_info=True) 
       data_queue.put(data) 
       break 
      log.info('submitted data to broker') 


def setup_receiver(channel, data_queue): 
    def process_data(channel, method, properties, body): 
     log.info('received data from broker') 
     data_queue.put(body) 
     channel.basic_ack(delivery_tag=method.delivery_tag) 

    def on_queue_declared(frame): 
     channel.basic_consume(process_data, queue='sandbox') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


if __name__ == '__main__': 
    if len(sys.argv) != 2: 
     print 'usage: %s RABBITMQ_HOST' % sys.argv[0] 
     sys.exit() 

    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 

    host = sys.argv[1] 
    log.info('connecting to host: %s', host) 
    parameters = pika.ConnectionParameters(host=host, heartbeat=True) 
    data_queue = Queue.Queue(0) 
    data_queue.put('message') # prime the pump 

    # run submitter in a thread 

    setup = partial(setup_submitter, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'submitter') 
    thread = threading.Thread(target= 
     partial(broker.connect, forever=True)) 

    # uncomment these lines to use the blocking variant of the submitter 
    #thread = threading.Thread(target= 
    # partial(blocking_submitter, parameters, data_queue)) 

    thread.daemon = True 
    thread.start() 

    # run receiver in main thread 
    setup = partial(setup_receiver, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'receiver') 
    broker.connect(forever=True) 
+0

Cảm ơn đã dành thời gian đi qua mã và tìm kiếm tất cả các vấn đề liên quan đến nó. Tôi hiện đang sử dụng http://barryp.org/software/py-amqplib/ đó là imo một thư viện cơ bản/đơn giản hơn nhưng phù hợp với nhu cầu của tôi hoàn toàn. Kết hợp với gevent tôi có một số kết quả thực sự tốt đẹp. Tôi không còn bận tâm với Pika những ngày này nữa. –

+1

bạn có thể sử dụng Channel.confirm_delivery() để chờ ack sau khi xuất bản, khi kết nối đã đóng, nó sẽ hết thời gian sau đó bạn sẽ biết thông báo không được gửi tới người môi giới –

Các vấn đề liên quan