Có nhiều cách để thực hiện việc này. Ví dụ, bạn có thể sử dụng EventingBasicConsumer
cùng với ManualResetEvent
, như thế này (đó là chỉ dành riêng cho mục đích trình diễn - sử dụng tốt hơn một trong những phương thức dưới đây):
var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
// setup signal
using (var signal = new ManualResetEvent(false)) {
var consumer = new EventingBasicConsumer(channel);
byte[] messageBody = null;
consumer.Received += (sender, args) => {
messageBody = args.Body;
// process your message or store for later
// set signal
signal.Set();
};
// start consuming
channel.BasicConsume("your.queue", false, consumer);
// wait until message is received or timeout reached
bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
// cancel subscription
channel.BasicCancel(consumer.ConsumerTag);
if (timeout) {
// timeout reached - do what you need in this case
throw new Exception("timeout");
}
// at this point messageBody is received
}
}
}
Như bạn đã nêu trong ý kiến - nếu bạn mong đợi nhiều tin nhắn trong cùng một hàng đợi , đó không phải là cách tốt nhất. Vâng, nó không phải là cách tốt nhất trong mọi trường hợp, tôi đã bao gồm nó chỉ để chứng minh việc sử dụng ManualResetEvent
trong trường hợp thư viện chính nó không cung cấp hỗ trợ timeout.
Nếu bạn đang thực hiện RPC (gọi thủ tục từ xa, yêu cầu trả lời) - bạn có thể sử dụng SimpleRpcClient
cùng với SimpleRpcServer
ở phía máy chủ. phía khách hàng sẽ trông như thế này:
var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
// do something on timeout
};
var reply = client.Call(myMessage); // will return reply or null if timeout reached
Thậm chí đơn giản hơn cách: sử dụng lớp cơ bản Subscription
(nó sử dụng cùng một EventingBasicConsumer
nội bộ, nhưng hỗ trợ timeout do đó bạn không cần phải thực hiện chính mình), như thế này:
var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
// timeout
}
Giải pháp đầu tiên không hợp lệ. BasicConsume không được bảo đảm để ngừng tiêu thụ trên BasicCancel, nó có thể làm điều này một chút sau đó do thực hiện bay của thỏ (chỉ cố gắng tiêu thụ một tin nhắn duy nhất cho mỗi yêu cầu và bạn sẽ thấy rằng trong một số trường hợp bạn gán messageBody vài lần). Bạn sẽ vẫn cần phải requeue tin nhắn dư thừa. Đối với phần thứ hai và thứ ba, tôi sẽ thử ngay bây giờ =) – eocron
Mặc dù phần trên của bạn không liên quan, lớp Đăng ký chính xác là những gì tôi muốn! Cảm ơn bạn, nó hoạt động tuyệt vời! Bạn có thể chỉnh sửa câu trả lời để những người khác biết rằng người cuối cùng đã làm việc không? – eocron
Tuy nhiên, nó lưu trữ một loạt các thông điệp bên trong bằng cách thực hiện, trong khi tôi chỉ cần một =/ – eocron