Tôi đã làm việc với JMS và ActiveMQ. Tất cả mọi thứ đang làm việc kỳ diệu. Tôi không sử dụng mùa xuân, cũng không thể I.Báo hiệu một lần khôi phục từ một JMS MessageListener
Giao diện javax.jms.MessageListener
chỉ có một phương pháp, onMessage
. Từ bên trong triển khai, có một cơ hội là một ngoại lệ sẽ được ném. Nếu trong thực tế, một ngoại lệ được ném ra, sau đó tôi nói rằng thông điệp không được xử lý đúng cách và cần phải được thử lại. Vì vậy, tôi cần ActiveMQ để chờ một chút và sau đó, thử lại. tức là tôi cần ngoại lệ được ném để khôi phục giao dịch JMS.
Làm cách nào để hoàn thành hành vi như vậy?
Có thể có một số cấu hình trong ActiveMQ mà tôi không thể tìm thấy.
Hoặc ... có lẽ có thể loại bỏ đăng ký MessageListener
s cho người tiêu dùng và tiêu thụ các thông điệp bản thân mình, trong một một vòng lặp như:
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
trong một vài chủ đề, thay vì đăng ký người nghe.
Hoặc ... Tôi có thể bằng cách nào đó trang trí/AOP/byte thao tác các MessageListener
s để tự làm điều này.
Bạn sẽ đi theo lộ trình nào và tại sao?
lưu ý: Tôi không có toàn quyền kiểm soát mã của MessageListener
.
EDIT Một thử nghiệm cho bằng chứng của khái niệm:
@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
Cảm ơn bạn rất nhiều cá voi và @Ammar cho câu trả lời. Tôi upvoting cả hai kể từ khi cả hai đã chỉ cho tôi vào đúng hướng. Nhưng chưa chọn câu trả lời đúng. Vì cần nhiều thử nghiệm hơn. –