Tôi đang sử dụng Pika để xử lý dữ liệu từ RabbitMQ. Khi tôi dường như chạy vào các loại vấn đề khác nhau, tôi quyết định viết một ứng dụng thử nghiệm nhỏ để xem cách tôi có thể xử lý việc ngắt kết nối.RabbitMQ, Pika và chiến lược kết nối lại
tôi đã viết ứng dụng này kiểm tra mà sau:
- Connect để môi giới, thử lại cho đến khi thành công
- Khi kết nối tạo ra một hàng đợi.
- Tiêu thụ hàng đợi này và đặt kết quả vào một Queue Queue.Queue (0)
- Lấy mục từ Queue.Queue (0) và đưa nó trở lại hàng đợi nhà môi giới.
Những gì tôi nhận thấy là 2 vấn đề:
- Khi tôi chạy kịch bản của tôi từ một máy chủ kết nối với RabbitMQ trên máy chủ khác (bên trong một vm) sau đó kịch bản này hiện hữu trên những khoảnh khắc ngẫu nhiên mà không tạo ra một lỗi.
- Khi tôi chạy tập lệnh của mình trên cùng một máy chủ mà RabbitMQ được cài đặt, nó chạy tốt và tiếp tục chạy.
Điều này có thể được giải thích do sự cố mạng, gói bị xóa mặc dù tôi thấy kết nối không thực sự mạnh mẽ.
Khi kịch bản chạy cục bộ trên máy chủ RabbitMQ và tôi giết RabbitMQ sau đó kịch bản thoát với lỗi: "ERROR pika SelectConnection: Socket Error on 3: 104"
Vì vậy, có vẻ như tôi không thể có được chiến lược kết nối lại hoạt động như mong muốn. Ai đó có thể xem mã để xem tôi đang làm gì sai?
Cảm ơn,
Jay
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
Cảm ơn đã dành thời gian đi qua mã và tìm kiếm tất cả các vấn đề liên quan đến nó. Tôi hiện đang sử dụng http://barryp.org/software/py-amqplib/ đó là imo một thư viện cơ bản/đơn giản hơn nhưng phù hợp với nhu cầu của tôi hoàn toàn. Kết hợp với gevent tôi có một số kết quả thực sự tốt đẹp. Tôi không còn bận tâm với Pika những ngày này nữa. –
bạn có thể sử dụng Channel.confirm_delivery() để chờ ack sau khi xuất bản, khi kết nối đã đóng, nó sẽ hết thời gian sau đó bạn sẽ biết thông báo không được gửi tới người môi giới –