2013-09-05 17 views
5

Tôi đang đối mặt với một số vấn đề đang cố gắng sử dụng lại XAConnection và XASession trên nhiều công nhân trong ứng dụng JBoss của tôi. Tôi đã quản lý đơn giản hóa vấn đề xuống chỉ bằng một phương pháp duy nhất. Bạn có thể cả hai số Sản xuấtNgười tiêu dùng một thông báo sử dụng cùng một kết nối và phiên. Hiện tại, ứng dụng của tôi có rất nhiều hàng đợi và công nhân, trong đó mỗi nhân viên hiện đang bắt đầu và bắt đầu từng kết nối và phiên riêng, thay vì chia sẻ nó. Điều đó không nên có thể?HornetQ: Cách sử dụng lại XAConnection và XASession

Dưới đây là ví dụ mã của tôi:

import org.apache.log4j.Logger; 
import javax.annotation.PostConstruct; 
import javax.annotation.PreDestroy; 
import javax.ejb.Singleton; 
import javax.ejb.Startup; 
import javax.jms.*; 
import javax.jms.Queue; 
import javax.naming.InitialContext; 

@Singleton 
@Startup 
public class QueueTest { 

    private Logger logger = Logger.getLogger(QueueTest.class); 

    @PostConstruct 
    public void startup() { 
     try { 
      String queue = "queue/Queue1"; 
      String message = "test"; 

      //setting up connection 
      InitialContext iniCtx = new InitialContext(); 
      XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); 
      XAConnection connection = qcf.createXAConnection(); 
      connection.start(); 
      logger.debug("creating connection at " + new java.util.Date()); 

      //setting up session 
      XASession session = connection.createXASession(); 
      logger.debug("creating session at " + new java.util.Date()); 

      //find the queue 
      Object queueObj = iniCtx.lookup(queue); 
      Queue jmsQueue = (javax.jms.Queue)queueObj; 

      //adding message to queue 
      javax.jms.MessageProducer producer = session.createProducer(jmsQueue); 
      javax.jms.TextMessage textMessage = session.createTextMessage(message); 
      producer.send(textMessage); 
      producer.close(); 
      logger.debug("Message added to queue"); 

      //receiving message from queue 
      javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); 
      javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); 

      if (messageReceived==null) 
       throw new Exception("No message reveived"); 

      logger.debug("Got message:"+messageReceived.getText()); 
      consumer.close(); 
     } 
     catch(Exception e) { 
      logger.debug("Error: " + e.getMessage(), e); 
     } 
    } 

    @PreDestroy 
    public void shutdown() { 

    } 
} 

Nó dẫn đến kết quả này:

11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013 
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013 
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue 
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived 

Như bạn có thể thấy, không có thông báo được nhận bởi các Consumer. Tại sao?

EDIT 1:

package dk.energimidt.uapi.zigbee.services; 

import org.apache.log4j.Logger; 

import javax.ejb.Stateless; 
import javax.ejb.TransactionAttribute; 
import javax.ejb.TransactionAttributeType; 
import javax.jms.Queue; 
import javax.jms.XAConnection; 
import javax.jms.XAConnectionFactory; 
import javax.jms.XASession; 
import javax.naming.InitialContext; 

@TransactionAttribute(TransactionAttributeType.REQUIRED) 
@Stateless 
public class QueueTestWorkerBean implements QueueTestWorker { 

    private Logger logger = Logger.getLogger(QueueTestWorkerBean.class); 

    public void run() { 
     try { 
      String queue = "queue/Queue1"; 
      String message = "test"; 

      //setting up connection 
      InitialContext iniCtx = new InitialContext(); 
      XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); 
      XAConnection connection = qcf.createXAConnection(); 
      connection.start(); 
      logger.debug("creating connection at " + new java.util.Date()); 

      //setting up session 
      XASession session = connection.createXASession(); 
      logger.debug("creating session at " + new java.util.Date()); 

      //find the queue 
      Object queueObj = iniCtx.lookup(queue); 
      Queue jmsQueue = (javax.jms.Queue)queueObj; 

      //adding message to queue 
      javax.jms.MessageProducer producer = session.createProducer(jmsQueue); 
      javax.jms.TextMessage textMessage = session.createTextMessage(message); 
      producer.send(textMessage); 
      producer.close(); 
      session.commit(); 
      logger.debug("Message added to queue"); 

      //receiving message from queue 
      javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); 
      javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); 

      if (messageReceived==null) 
       throw new Exception("No message reveived"); 

      logger.debug("Got message:"+messageReceived.getText()); 
      consumer.close(); 

      connection.close(); 
     } 
     catch(Exception e) { 
      logger.debug("Error: " + e.getMessage(), e); 
     } 
    } 
} 

Bây giờ tôi nhận được một ngoại lệ trên Session.Commit():

10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013 
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013 
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection 
    at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386) 
    at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:] 

Trả lời

2

tôi nhìn thấy một vài (trên thực tế 2) thăng hỗn hợp có:

i - bạn đang sử dụng một phiên XA nhưng bạn không khai báo bất kỳ ranh giới giao dịch ... mà thường được thực hiện trên Session Beans và MDB. Tôi không chắc chắn bạn có thể làm điều đó trên Stateless này.

nếu bạn không sử dụng bất kỳ Giao dịch khai báo nào, bạn sẽ phải tranh thủ XID theo cách thủ công.

ii - jmsXA là nhà máy kết nối bộ điều hợp nguồn tài nguyên mặc định. Nó đã có một hồ bơi trên nó rồi. Vì vậy, bất cứ khi nào bạn tạo một phiên mới bạn đang lấy ra khỏi hồ bơi. Khi bạn đóng nó, bạn sẽ trả lại nó cho hồ bơi.

Bạn có thể sử dụng Nhà máy kết nối thông thường. Chỉ trong InVMConnectionFactory (hoặc bất kỳ thứ gì bạn đã định nghĩa trên standalone của mình, bên ngoài PooledConnectionFactories giả sử bạn đang sử dụng JBoss ... và sau đó chỉ cần sử dụng JMS thông thường. trường hợp bạn sẽ cần phải chắc chắn rằng bạn tranh thủ nó sử dụng api Manager của giao dịch trực tiếp.

nếu bạn sử dụng máy kết nối thường xuyên, bạn có thể sau đó chỉ cần giữ cho nó kết nối tới chừng nào bạn muốn.

Xin vui lòng cho tôi biết làm thế nào nó đi và tôi sẽ giúp bạn Tôi biết bạn bắt đầu một tiền thưởng ..nhưng tôi đã trả lời miễn phí :)

Tôi không thể tìm thấy bất kỳ ví dụ nào về Hướng dẫn EJB về cách sử dụng Giao dịch với Singleton.

Tôi khuyên bạn nên sử dụng nó thông qua Statless hoặc Stateful Session Bean và sau đó áp dụng @TransactionAttribute cho Bean.

Java EE 6 Hướng dẫn có một số thông tin tốt về nó:

http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html

thông báo rằng một thông báo sẽ không có sẵn cho đến khi bạn cam kết. Vì vậy, nếu bạn gửi một tin nhắn trong một giao dịch, bạn sẽ không thể nhận được tin nhắn đó trên cùng một giao dịch.

Trên ví dụ edit1 của bạn, bạn đang gửi một tin nhắn và tiêu thụ nó trên cùng một giao dịch. Điều đó sẽ không hoạt động như trước tiên bạn cần phải cam kết phương pháp sản xuất trước khi bạn có thể tiêu thụ nó. Bạn sẽ cần hai giao dịch trong trường hợp này, vì vậy Edit1 bị hỏng.

Ngoài ra: đảm bảo bạn đóng Kết nối ở cuối. Vì bạn đang sử dụng JmsXA (hoặc một nhà máy kết nối gộp), bạn sẽ có Máy chủ ứng dụng tự động thực hiện bỏ phiếu.

+0

Cảm ơn câu trả lời của bạn. Tôi sẽ cố gắng xem xét nó. Bạn có thể cho tôi một ví dụ nhỏ về giải pháp được mô tả trong phần 1 không? – dhrm

+0

Tôi không có bất kỳ ví dụ nào để sử dụng trong một lớp đơn lẻ. bạn nên tìm chú thích giao dịch trên EE6 –

+0

Tôi đã đăng bài chỉnh sửa với nhiều thông tin hơn –

1

Trước khi bạn thực sự có thể nhận được những thông điệp bạn cần phải cam kết đối tượng phiên. Sau câu lệnh producer.send, bạn cần thêm session.commit.

Ngoài ra, tôi khuyên bạn nên đóng nhà sản xuất ở cuối.

Điều khác có vẻ sai là bạn tạo người tiêu dùng sau khi nhà sản xuất đã bị hủy.

+0

Cảm ơn bạn đã trả lời. Thêm cam kết trong dòng sau khi producer.send, dẫn đến kết nối * XA: javax.jms.TransactionInProgressException *. – dhrm

+0

Tiền thưởng được đặt vào câu trả lời sai. Tôi xin lôi. – dhrm

+0

Tôi đã thu hồi tiền thưởng của bạn, @Dennis. Bạn có thể cung cấp lại và tặng thưởng như mong muốn. Điều này cho phép * cả hai * câu trả lời khác bắn vào nó. – Shog9

Các vấn đề liên quan