Có thể gửi tin nhắn qua RabbitMQ với một số chậm trễ không? Ví dụ: tôi muốn hết hạn phiên khách hàng sau 30 phút và tôi sẽ gửi một tin nhắn sẽ được xử lý sau 30 phút.Tin nhắn bị trễ trong RabbitMQ
Trả lời
Hiện tại không thể thực hiện được. Bạn phải lưu dấu thời gian hết hạn của bạn trong cơ sở dữ liệu hoặc một cái gì đó tương tự, và sau đó có một chương trình trợ giúp đọc các dấu thời gian đó và xếp hàng một thông điệp.
Thư bị trễ là một tính năng được yêu cầu thường xuyên vì chúng hữu ích trong nhiều trường hợp. Tuy nhiên, nếu nhu cầu của bạn là hết hạn các phiên khách hàng, tôi tin rằng nhắn tin không phải là giải pháp lý tưởng cho bạn, và cách tiếp cận khác có thể hoạt động tốt hơn.
Với việc phát hành RabbitMQ V2.8, giao hàng dự kiến là bây giờ đã có nhưng như một tính năng gián tiếp: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
Tôi đã thử cách tiếp cận này nhưng gặp phải một số vấn đề, đề xuất bất kỳ vấn đề nào? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 –
Tôi đã tăng đột biến và đạt được một số showstoppers: 1. Tin nhắn chỉ DLQ: vi khi ở trên cùng của Q (http://www.rabbitmq.com/ttl.html - phần Caveats) Điều này có nghĩa là nếu lần đầu tiên tôi đặt thông điệp 1 hết hạn sau 4 giờ và msg2 hết hạn sau 1 giờ msg2 sẽ chỉ hết hạn sau khi msg1 hết hạn. 2. TTL cho tin nhắn được giữ bởi Rabbit vì vậy hãy nói rằng bạn sử dụng thời gian chờ ngắn là 10 giây. Nếu người tiêu dùng không thể tiêu thụ tin nhắn trong vòng 10 giây sau khi hết hạn (do tồn đọng), nó sẽ bị hủy và bị mất Ở trên đã được xác minh với Thỏ 3.0.1 Các bạn có thấy giải pháp nào không ? –
Như tôi đã không có đủ uy tín để thêm bình luận, đăng một câu trả lời mới. Đây chỉ là một bổ sung cho những gì đã được thảo luận tại http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
Ngoại trừ thay vì đặt ttl trên tin nhắn, bạn có thể đặt nó ở mức hàng đợi. Ngoài ra, bạn có thể tránh tạo một trao đổi mới chỉ vì mục đích chuyển hướng thư đến Hàng đợi khác. Đây là mã java mẫu:
Nhà sản xuất:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class DelayedProducer {
private final static String QUEUE_NAME = "ParkingQueue";
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange", "");
arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i=0; i<5; i++) {
String message = "This is a sample message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message "+i+" got published to the queue!");
Thread.sleep(3000);
}
channel.close();
connection.close();
}
}
Tiêu Dùng:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Cảm ơn bạn rất nhiều. Tôi nghĩ rằng bạn có một sai lầm nhỏ trong hàng đợi người tiêu dùng khai báo channel.queueDeclare (QUEUE_NAME, false, false, false, null); Nó phải có "DESTINATION_QUEUE_NAME" thay vì "QUEUE_NAME". Thực sự cảm ơn bạn rất nhiều –
Dường như this blog post mô tả bằng việc trao đổi thư chết và thông điệp ttl để làm điều gì đó tương tự.
Mã bên dưới sử dụng CoffeeScript và Node.JS để truy cập Thỏ và triển khai một cái gì đó tương tự.
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
Giả sử bạn có kiểm soát đối với người tiêu dùng, bạn có thể đạt được trì hoãn trên người tiêu dùng như thế nào ?? này:
Nếu chúng tôi chắc chắn rằng thông điệp thứ n trong hàng đợi luôn luôn có một sự chậm trễ nhỏ hơn so với thông điệp n + 1 (điều này có thể đúng đối với nhiều trường hợp sử dụng): Nhà sản xuất gửi timeInformation trong nhiệm vụ truyền đạt thời gian mà công việc này cần được thực hiện (currentTime + delay). Người tiêu dùng:
1) Đọc thời gian đã lập lịch từ nhiệm vụ
2) nếu currentTime> scheduleTime go ahead.
chậm trễ khác = scheduledTime - currentTime
ngủ trong thời gian chỉ định bởi chậm trễ
Người tiêu dùng luôn được cấu hình với một tham số đồng thời. Vì vậy, các tin nhắn khác sẽ chỉ chờ trong hàng đợi cho đến khi người tiêu dùng hoàn thành công việc. Vì vậy, giải pháp này có thể hoạt động tốt mặc dù có vẻ khó xử, đặc biệt là sự chậm trễ lớn.
Nhờ câu trả lời của Norman, tôi có thể triển khai nó trong NodeJS.
Mọi thứ đều rõ ràng từ mã. Hy vọng nó sẽ tiết kiệm thời gian của ai đó.
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
Có hai phương pháp bạn có thể thử:
Cũ Cách tiếp cận: Đặt TTL (time to live) tiêu đề trong mỗi tin nhắn/hàng đợi (chính sách) và sau đó giới thiệu một DLQ để xử lý nó. khi ttl hết hạn, các tin nhắn của bạn sẽ chuyển từ DLQ sang hàng đợi chính để người nghe của bạn có thể xử lý nó.
Cách tiếp cận mới nhất: Gần đây RabbitMQ đã đưa ra RabbitMQ Chậm nhắn Plugin, sử dụng mà bạn có thể đạt được như vậy và hỗ trợ plugin này có sẵn từ RabbitMQ-3.5.8.
Bạn có thể khai báo trao đổi với loại x-delay-message và sau đó xuất bản thư có thời gian trễ x-tiêu đề tùy chỉnh thể hiện bằng mili giây thời gian trễ cho thư. Thông điệp sẽ được gửi đến hàng đợi tương ứng sau mili giây x-chậm trễ
More đây: git
- 1. Cách rút lại tin nhắn trong RabbitMQ?
- 2. RabbitMQ Đợi tin nhắn hết thời gian
- 3. Tiêu thụ không xác nhận tin nhắn từ RabbitMq
- 4. Thông báo thời gian đến của tin nhắn Rabbitmq
- 5. Tin nhắn MSMQ luôn bị trễ 3 phút trên cùng một máy
- 6. Hàng đợi công việc/tin nhắn có thể mở rộng đơn giản với độ trễ
- 7. Làm cách nào để phát lại các tin nhắn bị nhỡ khi sử dụng STOMP để kết nối với RabbitMQ?
- 8. Người tiêu dùng tin nhắn RabbitMQ ngừng tiêu thụ thông báo
- 9. Rabbitmq truy xuất nhiều tin nhắn bằng cách sử dụng một cuộc gọi đồng bộ đơn
- 10. Đợi một tin nhắn RabbitMQ duy nhất với thời gian chờ
- 11. Sử dụng EasyNetQ với RabbitMQ để xuất bản và nhận tin nhắn
- 12. Nhắn tin trễ thời gian trễ của WebSphere MQ - Liệu có API JMS (hoặc JMS thích) không?
- 13. Xử lý tin nhắn không đồng bộ của khách hàng Pika RabbitMQ
- 14. Có thể xem nội dung tin nhắn RabbitMQ trực tiếp từ dòng lệnh không?
- 15. Độ trễ thấp, tin nhắn có quy mô lớn xếp hàng
- 16. Hoạt động nhắn tin giống như thiết bị đầu cuối
- 17. Android, đẩy tin nhắn tới 1000 thiết bị nhanh
- 18. Tin nhắn GCM của Android cho một thiết bị khác
- 19. Không thể nhận tin nhắn từ thiết bị (iPhone)
- 20. RabbitMQ Queue peeking
- 21. Gửi tin nhắn SMS/Tin nhắn văn bản qua PHP
- 22. Tin nhắn bị mất trên XMPP trên thiết bị bị ngắt kết nối
- 23. WCF Lỗi - Hạn ngạch kích thước tin nhắn tối đa cho tin nhắn đến (65536) đã bị vượt quá
- 24. AMQP/RabbitMQ người gửi trong AVR (Arduino)
- 25. WCF Ngoại lệ: Hạn ngạch kích thước tin nhắn tối đa cho tin nhắn đến (65536) đã bị vượt quá
- 26. Cách theo dõi tin nhắn trong Android?
- 27. mvvm light - nhắn tin
- 28. Nhận tin nhắn WhatsApp
- 29. Xử lý tin nhắn AeroSnap trong WndProc
- 30. Vòng tin nhắn Erlang
Bạn cần phải sử dụng RabbitMQ? –
Có, tính năng này khả dụng từ ThỏMQ-3.5.8. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar
Nếu bạn sử dụng Spring AMQP, có [hỗ trợ cho plugin] (https://docs.spring.io/spring-amqp/reference/htmlsingle/# delay-message-exchange). – Gruber