2010-09-21 39 views
8

Tôi muốn gửi tin nhắn đến máy chủ RabbitMQ và sau đó chờ tin nhắn trả lời (trên hàng đợi "trả lời"). Tất nhiên, tôi không muốn chờ đợi mãi mãi trong trường hợp ứng dụng xử lý các tin nhắn này là xuống - cần phải có một thời gian chờ. Nghe có vẻ như một nhiệm vụ rất cơ bản, nhưng tôi không thể tìm ra cách để làm điều này. Bây giờ tôi đã gặp phải vấn đề này với Java API.RabbitMQ Đợi tin nhắn hết thời gian

Trả lời

0

com.rabbitmq.client.QueueingConsumer có một phương pháp nextDelivery(long timeout), mà sẽ làm những gì bạn muốn. Tuy nhiên, điều này đã không được chấp nhận. Viết thời gian chờ của riêng bạn không quá khó, mặc dù có thể tốt hơn để có một chuỗi liên tục và danh sách các số nhận dạng trong thời gian, thay vì thêm và xóa người tiêu dùng và các chuỗi thời gian chờ liên quan.

Chỉnh sửa để thêm: Đã chú ý ngày này sau khi trả lời!

0

Tôi đã tiếp cận vấn đề này bằng cách sử dụng C# bằng cách tạo một đối tượng để theo dõi phản hồi cho một thư cụ thể. Nó thiết lập hàng đợi trả lời duy nhất cho một tin nhắn và đăng ký nó. Nếu không nhận được phản hồi trong khung thời gian đã chỉ định, bộ đếm thời gian đếm ngược sẽ hủy đăng ký, việc này sẽ xóa hàng đợi. Riêng biệt, tôi có các phương thức có thể đồng bộ từ chuỗi chính của tôi (sử dụng semaphore) hoặc không đồng bộ (sử dụng gọi lại) để sử dụng chức năng này.

Về cơ bản, việc thực hiện trông như thế này:

//Synchronous case: 
//Throws TimeoutException if timeout happens 
var msg = messageClient.SendAndWait(theMessage); 

//Asynchronous case 
//myCallback receives an exception message if there is a timeout 
messageClient.SendAndCallback(theMessage, myCallback); 
2

Thư viện client RabbitMQ Java tại supports a timeout argument to its QueueConsumer.nextDelivery() method.

Ví dụ, hướng dẫn RPC sử dụng đoạn mã sau:

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    if (delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 

Bây giờ, bạn có thể sử dụng consumer.nextDelivery(1000) phải chờ tối đa một giây. Nếu hết thời gian chờ, phương thức trả về null.

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    // Use a timeout of 1000 milliseconds 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); 

    // Test if delivery is null, meaning the timeout was reached. 
    if (delivery != null && 
     delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 
Các vấn đề liên quan