2017-05-22 20 views
9

Các giải pháp trong RabbitMQ Wait for a message with a timeoutWait for a single RabbitMQ message with a timeout dường như không hoạt động vì không có phương thức phân phối tiếp theo nào trong thư viện C# chính thức và QueueingBasicConsumer bị depricated, vì vậy nó chỉ ném NotSupportedException ở mọi nơi.C# RabbitMQ đợi một tin nhắn cho thời gian chờ được chỉ định?

Làm cách nào để chờ một tin nhắn từ hàng đợi cho thời gian chờ được chỉ định?

PS

Nó có thể được thực hiện thông qua Basic.Get(), vâng, nhưng tốt, nó là giải pháp xấu để kéo các thông điệp trong khoảng specififed (giao thông dư thừa, dư thừa CPU).

Cập nhật

EventingBasicConsumer bởi implmenetation KHÔNG HỖ TRỢ hủy ngay lập tức. Ngay cả khi bạn gọi BasicCancel tại một số điểm, ngay cả khi bạn chỉ định tìm nạp trước thông qua BasicQos - nó vẫn sẽ tìm nạp trong Khung và các khung đó có thể chứa nhiều thư. Vì vậy, nó không phải là tốt cho thực hiện nhiệm vụ duy nhất. Đừng bận tâm - nó không hoạt động với các tin nhắn đơn lẻ.

Trả lời

5

Có nhiều cách để thực hiện việc này. Ví dụ, bạn có thể sử dụng EventingBasicConsumer cùng với ManualResetEvent, như thế này (đó là chỉ dành riêng cho mục đích trình diễn - sử dụng tốt hơn một trong những phương thức dưới đây):

var factory = new ConnectionFactory(); 
using (var connection = factory.CreateConnection()) { 
    using (var channel = connection.CreateModel()) { 
     // setup signal 
     using (var signal = new ManualResetEvent(false)) { 
      var consumer = new EventingBasicConsumer(channel); 
      byte[] messageBody = null;       
      consumer.Received += (sender, args) => { 
       messageBody = args.Body; 
       // process your message or store for later 
       // set signal 
       signal.Set(); 
      };    
      // start consuming 
      channel.BasicConsume("your.queue", false, consumer); 
      // wait until message is received or timeout reached 
      bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10)); 
      // cancel subscription 
      channel.BasicCancel(consumer.ConsumerTag); 
      if (timeout) { 
       // timeout reached - do what you need in this case 
       throw new Exception("timeout"); 
      } 

      // at this point messageBody is received 
     } 
    } 
} 

Như bạn đã nêu trong ý kiến ​​- nếu bạn mong đợi nhiều tin nhắn trong cùng một hàng đợi , đó không phải là cách tốt nhất. Vâng, nó không phải là cách tốt nhất trong mọi trường hợp, tôi đã bao gồm nó chỉ để chứng minh việc sử dụng ManualResetEvent trong trường hợp thư viện chính nó không cung cấp hỗ trợ timeout.

Nếu bạn đang thực hiện RPC (gọi thủ tục từ xa, yêu cầu trả lời) - bạn có thể sử dụng SimpleRpcClient cùng với SimpleRpcServer ở phía máy chủ. phía khách hàng sẽ trông như thế này:

var client = new SimpleRpcClient(channel, "your.queue"); 
client.TimeoutMilliseconds = 10 * 1000; 
client.TimedOut += (sender, args) => { 
    // do something on timeout 
};      
var reply = client.Call(myMessage); // will return reply or null if timeout reached 

Thậm chí đơn giản hơn cách: sử dụng lớp cơ bản Subscription (nó sử dụng cùng một EventingBasicConsumer nội bộ, nhưng hỗ trợ timeout do đó bạn không cần phải thực hiện chính mình), như thế này:

var sub = new Subscription(channel, "your.queue"); 
BasicDeliverEventArgs reply; 
if (!sub.Next(10 * 1000, out reply)) { 
    // timeout 
} 
+0

Giải pháp đầu tiên không hợp lệ. BasicConsume không được bảo đảm để ngừng tiêu thụ trên BasicCancel, nó có thể làm điều này một chút sau đó do thực hiện bay của thỏ (chỉ cố gắng tiêu thụ một tin nhắn duy nhất cho mỗi yêu cầu và bạn sẽ thấy rằng trong một số trường hợp bạn gán messageBody vài lần). Bạn sẽ vẫn cần phải requeue tin nhắn dư thừa. Đối với phần thứ hai và thứ ba, tôi sẽ thử ngay bây giờ =) – eocron

+0

Mặc dù phần trên của bạn không liên quan, lớp Đăng ký chính xác là những gì tôi muốn! Cảm ơn bạn, nó hoạt động tuyệt vời! Bạn có thể chỉnh sửa câu trả lời để những người khác biết rằng người cuối cùng đã làm việc không? – eocron

+0

Tuy nhiên, nó lưu trữ một loạt các thông điệp bên trong bằng cách thực hiện, trong khi tôi chỉ cần một =/ – eocron

Các vấn đề liên quan