2011-08-27 31 views
12

Tôi đã làm việc với JMS và ActiveMQ. Tất cả mọi thứ đang làm việc kỳ diệu. Tôi không sử dụng mùa xuân, cũng không thể I.Báo hiệu một lần khôi phục từ một JMS MessageListener

Giao diện javax.jms.MessageListener chỉ có một phương pháp, onMessage. Từ bên trong triển khai, có một cơ hội là một ngoại lệ sẽ được ném. Nếu trong thực tế, một ngoại lệ được ném ra, sau đó tôi nói rằng thông điệp không được xử lý đúng cách và cần phải được thử lại. Vì vậy, tôi cần ActiveMQ để chờ một chút và sau đó, thử lại. tức là tôi cần ngoại lệ được ném để khôi phục giao dịch JMS.

Làm cách nào để hoàn thành hành vi như vậy?

Có thể có một số cấu hình trong ActiveMQ mà tôi không thể tìm thấy.

Hoặc ... có lẽ có thể loại bỏ đăng ký MessageListener s cho người tiêu dùng và tiêu thụ các thông điệp bản thân mình, trong một một vòng lặp như:

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

trong một vài chủ đề, thay vì đăng ký người nghe.

Hoặc ... Tôi có thể bằng cách nào đó trang trí/AOP/byte thao tác các MessageListener s để tự làm điều này.

Bạn sẽ đi theo lộ trình nào và tại sao?

lưu ý: Tôi không có toàn quyền kiểm soát mã của MessageListener.

EDIT Một thử nghiệm cho bằng chứng của khái niệm:

@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

Cảm ơn bạn rất nhiều cá voi và @Ammar cho câu trả lời. Tôi upvoting cả hai kể từ khi cả hai đã chỉ cho tôi vào đúng hướng. Nhưng chưa chọn câu trả lời đúng. Vì cần nhiều thử nghiệm hơn. –

Trả lời

10

Nếu bạn muốn sử dụng SESSION_TRANSACTED làm chế độ xác nhận, bạn cần thiết lập RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website cũng chứa một số thông tin tốt về những gì bạn có thể cần làm.

Vì bạn đang không sử dụng Spring, bạn có thể thiết lập một RedeliveryPolicy với một cái gì đó tương tự như đoạn mã sau (lấy từ một trong các liên kết ở trên):

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

Sửa Lấy của bạn đoạn mã được thêm vào câu trả lời, phần sau đây cho biết cách hoạt động của giao dịch này với các giao dịch. Hãy thử mã này với phương thức Session.rollback() đã nhận xét và bạn sẽ thấy rằng sử dụng SESION_TRANSACTED và Session.cam kết/rollback làm việc như mong đợi:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

Điều đó không hiệu quả. Nhưng chỉ cho tôi đi đúng hướng. Tôi sẽ rời khỏi DUPS_OK_ACKNOWLEDGE vì nó có vẻ là một trong những công trình mà tôi phải làm việc ít nhất. –

+0

Bạn cần dán toàn bộ mã của mình, vì bạn không làm điều gì đó chính xác với Phiên của mình. DUPS_OK_ACKNOWLEDGE chỉ xuất hiện để được làm việc kể từ khi khách hàng thừa nhận là lười biếng và các nhà môi giới sẽ chỉ giữ gửi lại tin nhắn cho đến khi khách hàng cuối cùng không ack. – whaley

+0

Tôi đã dán một bằng chứng về khái niệm. Tôi chỉ có thể làm cho nó hoạt động với DUPS_OK_ACKNOWLEDGE và message.acknowledgement dường như không tạo ra sự khác biệt. –

2

Bạn cần phải thiết lập các chế độ thừa nhận để Session.CLIENT_ACKNOWLEDGE, khách hàng thừa nhận một thông điệp tiêu thụ bằng cách gọi phương pháp ghi nhận của thông điệp.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Sau đó, sau khi xử lý thông điệp tới cần phải gọi phương thức Message.acknowledge() để loại bỏ thông điệp đó.

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

Nó không hoạt động. _onMessage_ vẫn được gọi một lần, ngay cả khi _message.acknowledge() _ không bao giờ được gọi. –

+0

Bạn đã đặt chế độ xác nhận đúng chưa? Nó phải được đặt thành Session.CLIENT_ACKNOWLEDGE! – Ammar

+0

Nhưng nó hoạt động với (sai, Session.DUPS_OK_ACKNOWLEDGE) ... message.acknowledge() dường như không làm các trick. –

0

Nếu phiên của bạn được giao dịch, sau đó "acknowledgeMode" bị bỏ qua anyways..So, chỉ để lại phiên giao dịch của bạn và sử dụng session.rollback và session.commit cam kết hoặc quay lại giao dịch của bạn.

+1

Tôi nghĩ vấn đề (của tôi) là phiên không thể truy cập trong MessageListener.onMessage (Thư). –

Các vấn đề liên quan