2010-05-10 27 views
6

Tôi muốn gửi tin nhắn đến một máy chủ RabbitMQ và sau đó chờ tin nhắn trả lời (trên hàng đợi "trả lời"). Tất nhiên, tôi không muốn chờ đợi mãi mãi trong trường hợp ứng dụng xử lý các tin nhắn này là xuống - cần phải có một thời gian chờ. Nghe có vẻ như một nhiệm vụ rất cơ bản, nhưng tôi không thể tìm ra cách để làm điều này. Bây giờ tôi đã gặp sự cố này với cả hai py-amqplibRabbitMQ .NET client.Đợi một tin nhắn RabbitMQ duy nhất với thời gian chờ

Giải pháp tốt nhất mà tôi đã có cho đến nay là để thăm dò ý kiến ​​sử dụng basic_get với sleep ở giữa, nhưng điều này là khá xấu xí:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

Chắc chắn có một số cách tốt hơn?

Trả lời

8

Tôi vừa thêm hỗ trợ hết thời gian chờ cho amqplib trong carrot.

Đây là một lớp con của amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi là một phiên bản của channel.wait thể nhận được trên một số tùy ý của kênh.

Tôi đoán điều này có thể được hợp nhất ngược dòng tại một số thời điểm.

+1

Bây giờ đây là những gì tôi gọi là "câu trả lời tuyệt vời": "nó đã được sửa"! Chấp nhận - với hy vọng rằng * * được hợp nhất thành amqplib. – EMP

+0

@EMP haha ​​:) funny :) –

1

Điều này dường như phá vỡ toàn bộ ý tưởng về xử lý không đồng bộ, nhưng nếu bạn phải tôi nghĩ đúng cách để làm điều đó là sử dụng RpcClient.

+0

Mặc dù RpcClient không hữu ích cho tôi, xem xét triển khai của nó cho thấy cách tiếp cận để sử dụng: tạo 'QueueingBasicConsumer' và chờ đợi trên hàng đợi của nó, hỗ trợ một thời gian chờ. Điều này không phức tạp như .NET. – EMP

2

Có ví dụ here sử dụng qpid với msg = q.get(timeout=1) nên làm những gì bạn muốn. Xin lỗi, tôi không biết thư viện máy khách AMQP nào khác thực hiện hết thời gian chờ (và đặc biệt tôi không biết hai thư viện cụ thể mà bạn đã đề cập).

+0

Nhìn vào nguồn của qpid nó có vẻ sử dụng cách tiếp cận chính xác giống như máy khách .NET: 'basic_consume' với một hàng đợi và chờ đợi trên hàng đợi với thời gian chờ. Có vẻ như đó là những gì tôi sẽ phải làm. – EMP

8

Đây là những gì tôi đã kết thúc làm trong các khách hàng NET:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

Thật không may, tôi không thể làm tương tự với py-amqplib, bởi vì phương pháp basic_consume của nó không gọi gọi lại trừ khi bạn gọi channel.wait()channel.wait() không hỗ trợ hết thời gian chờ! Giới hạn ngớ ngẩn này (mà tôi tiếp tục chạy vào) có nghĩa là nếu bạn không bao giờ nhận được một thông điệp khác, luồng của bạn sẽ bị đóng băng mãi mãi.

1

Thỏ hiện cho phép bạn thêm các sự kiện hết thời gian chờ. Đơn giản chỉ cần quấn mã của bạn trong một thử bắt và sau đó ném ngoại lệ trong các trình xử lý TimeOut và Disconnect:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
} 
Các vấn đề liên quan