2010-12-14 33 views
23

Có thể gửi tin nhắn qua RabbitMQ với một số chậm trễ không? Ví dụ: tôi muốn hết hạn phiên khách hàng sau 30 phút và tôi sẽ gửi một tin nhắn sẽ được xử lý sau 30 phút.Tin nhắn bị trễ trong RabbitMQ

+0

Bạn cần phải sử dụng RabbitMQ? –

+1

Có, tính năng này khả dụng từ ThỏMQ-3.5.8. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar

+0

Nếu bạn sử dụng Spring AMQP, có [hỗ trợ cho plugin] (https://docs.spring.io/spring-amqp/reference/htmlsingle/# delay-message-exchange). – Gruber

Trả lời

5

Hiện tại không thể thực hiện được. Bạn phải lưu dấu thời gian hết hạn của bạn trong cơ sở dữ liệu hoặc một cái gì đó tương tự, và sau đó có một chương trình trợ giúp đọc các dấu thời gian đó và xếp hàng một thông điệp.

Thư bị trễ là một tính năng được yêu cầu thường xuyên vì chúng hữu ích trong nhiều trường hợp. Tuy nhiên, nếu nhu cầu của bạn là hết hạn các phiên khách hàng, tôi tin rằng nhắn tin không phải là giải pháp lý tưởng cho bạn, và cách tiếp cận khác có thể hoạt động tốt hơn.

9

Với việc phát hành RabbitMQ V2.8, giao hàng dự kiến ​​là bây giờ đã có nhưng như một tính năng gián tiếp: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

Tôi đã thử cách tiếp cận này nhưng gặp phải một số vấn đề, đề xuất bất kỳ vấn đề nào? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 –

+5

Tôi đã tăng đột biến và đạt được một số showstoppers: 1. Tin nhắn chỉ DLQ: vi khi ở trên cùng của Q (http://www.rabbitmq.com/ttl.html - phần Caveats) Điều này có nghĩa là nếu lần đầu tiên tôi đặt thông điệp 1 hết hạn sau 4 giờ và msg2 hết hạn sau 1 giờ msg2 sẽ chỉ hết hạn sau khi msg1 hết hạn. 2. TTL cho tin nhắn được giữ bởi Rabbit vì vậy hãy nói rằng bạn sử dụng thời gian chờ ngắn là 10 giây. Nếu người tiêu dùng không thể tiêu thụ tin nhắn trong vòng 10 giây sau khi hết hạn (do tồn đọng), nó sẽ bị hủy và bị mất Ở trên đã được xác minh với Thỏ 3.0.1 Các bạn có thấy giải pháp nào không ? –

6

Như tôi đã không có đủ uy tín để thêm bình luận, đăng một câu trả lời mới. Đây chỉ là một bổ sung cho những gì đã được thảo luận tại http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Ngoại trừ thay vì đặt ttl trên tin nhắn, bạn có thể đặt nó ở mức hàng đợi. Ngoài ra, bạn có thể tránh tạo một trao đổi mới chỉ vì mục đích chuyển hướng thư đến Hàng đợi khác. Đây là mã java mẫu:

Nhà sản xuất:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

Tiêu Dùng:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

Cảm ơn bạn rất nhiều. Tôi nghĩ rằng bạn có một sai lầm nhỏ trong hàng đợi người tiêu dùng khai báo channel.queueDeclare (QUEUE_NAME, false, false, false, null); Nó phải có "DESTINATION_QUEUE_NAME" thay vì "QUEUE_NAME". Thực sự cảm ơn bạn rất nhiều –

5

Dường như this blog post mô tả bằng việc trao đổi thư chết và thông điệp ttl để làm điều gì đó tương tự.

Mã bên dưới sử dụng CoffeeScript và Node.JS để truy cập Thỏ và triển khai một cái gì đó tương tự.

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

Giả sử bạn có kiểm soát đối với người tiêu dùng, bạn có thể đạt được trì hoãn trên người tiêu dùng như thế nào ?? này:

Nếu chúng tôi chắc chắn rằng thông điệp thứ n trong hàng đợi luôn luôn có một sự chậm trễ nhỏ hơn so với thông điệp n + 1 (điều này có thể đúng đối với nhiều trường hợp sử dụng): Nhà sản xuất gửi timeInformation trong nhiệm vụ truyền đạt thời gian mà công việc này cần được thực hiện (currentTime + delay). Người tiêu dùng:

1) Đọc thời gian đã lập lịch từ nhiệm vụ

2) nếu currentTime> scheduleTime go ahead.

chậm trễ khác = scheduledTime - currentTime

ngủ trong thời gian chỉ định bởi chậm trễ

Người tiêu dùng luôn được cấu hình với một tham số đồng thời. Vì vậy, các tin nhắn khác sẽ chỉ chờ trong hàng đợi cho đến khi người tiêu dùng hoàn thành công việc. Vì vậy, giải pháp này có thể hoạt động tốt mặc dù có vẻ khó xử, đặc biệt là sự chậm trễ lớn.

5

Nhờ câu trả lời của Norman, tôi có thể triển khai nó trong NodeJS.

Mọi thứ đều rõ ràng từ mã. Hy vọng nó sẽ tiết kiệm thời gian của ai đó.

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

Có hai phương pháp bạn có thể thử:

Cũ Cách tiếp cận: Đặt TTL (time to live) tiêu đề trong mỗi tin nhắn/hàng đợi (chính sách) và sau đó giới thiệu một DLQ để xử lý nó. khi ttl hết hạn, các tin nhắn của bạn sẽ chuyển từ DLQ sang hàng đợi chính để người nghe của bạn có thể xử lý nó.

Cách tiếp cận mới nhất: Gần đây RabbitMQ đã đưa ra RabbitMQ Chậm nhắn Plugin, sử dụng mà bạn có thể đạt được như vậy và hỗ trợ plugin này có sẵn từ RabbitMQ-3.5.8.

Bạn có thể khai báo trao đổi với loại x-delay-message và sau đó xuất bản thư có thời gian trễ x-tiêu đề tùy chỉnh thể hiện bằng mili giây thời gian trễ cho thư. Thông điệp sẽ được gửi đến hàng đợi tương ứng sau mili giây x-chậm trễ

More đây: git

Các vấn đề liên quan