2008-09-29 37 views
9

Lọc bộ nhận tin nhắn JMS bằng JMSCorrelationID

Làm thế nào tôi có thể khởi tạo trình nghe xếp hàng JMS trong java (JRE/JDK/J2EE 1.4) mà chỉ nhận được các tin nhắn khớp với một JMSCorrelationID đã cho? Các thông điệp mà tôi đang tìm kiếm để nhận được đã được xuất bản cho một hàng đợi và không phải là một chủ đề, mặc dù điều đó có thể thay đổi nếu cần thiết.

Dưới đây là đoạn code mà tôi đang sử dụng để đưa thông điệp trong hàng đợi:

/** 
* publishResponseToQueue publishes Requests to the Queue. 
* 
* @param jmsQueueFactory    -Name of the queue-connection-factory 
* @param jmsQueue     -The queue name for the request 
* @param response      -A response object that needs to be published 
* 
* @throws ServiceLocatorException  -An exception if a request message 
*          could not be published to the Topic 
*/ 
private void publishResponseToQueue(String jmsQueueFactory, 
            String jmsQueue, 
            Response response) 
     throws ServiceLocatorException { 

    if (logger.isInfoEnabled()) { 
     logger.info("Begin publishRequestToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + "," + response); 
    } 
    logger.assertLog(jmsQueue != null && !jmsQueue.equals(""), 
         "jmsQueue cannot be null"); 
    logger.assertLog(jmsQueueFactory != null && !jmsQueueFactory.equals(""), 
         "jmsQueueFactory cannot be null"); 
    logger.assertLog(response != null, "Request cannot be null"); 

    try { 

     Queue queue = (Queue)_context.lookup(jmsQueue); 

     QueueConnectionFactory factory = (QueueConnectionFactory) 
      _context.lookup(jmsQueueFactory); 

     QueueConnection connection = factory.createQueueConnection(); 
     connection.start(); 
     QueueSession session = connection.createQueueSession(false, 
            QueueSession.AUTO_ACKNOWLEDGE); 

     ObjectMessage objectMessage = session.createObjectMessage(); 

     objectMessage.setJMSCorrelationID(response.getID()); 

     objectMessage.setObject(response); 

     session.createSender(queue).send(objectMessage); 

     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     //XC3.2 Added/Modified BEGIN 
     logger.error("ServiceLocator.publishResponseToQueue - Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     throw new ServiceLocatorException("ServiceLocator.publishResponseToQueue " + 
              "- Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     //XC3.2 Added/Modified END 
    } 

    if (logger.isInfoEnabled()) { 
     logger.info("End publishResponseToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + response); 
    } 

} // end of publishResponseToQueue method 

Trả lời

10

Hàng đợi thiết lập kết nối là như nhau, nhưng một khi bạn có QueueSession, bạn đặt chọn khi tạo một máy thu.

QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'"); 

sau đó

receiver.receive() 

hoặc

receiver.setListener(myListener); 
+0

Gần đây tôi đã đọc cùng một chủ đề và có câu hỏi như sau: người nhận vẫn nhận được những tin nhắn không chứa id tương quan bắt buộc và thả chúng một cách im lặng để xử lý hoặc sẽ Bản thân nhà cung cấp JMS không cung cấp các thông điệp như vậy cho người nhận, để họ vẫn còn trên hàng đợi? Tôi cảm thấy thứ hai là cách tiếp cận đúng, nhưng muốn xác minh. Cảm ơn. – shrini1000

+0

@ shrini1000 bạn là chính xác. – Trying

5

BTW trong khi không phải là câu hỏi thực tế của nó bạn hỏi - nếu bạn đang cố gắng để thực hiện đáp ứng yêu cầu trên JMS tôi khuyên bạn nên reading this article như JMS API khá phức tạp hơn một chút so với những gì bạn có thể tưởng tượng và làm việc này hiệu quả hơn nhiều so với vẻ bề ngoài.

Đặc biệt to use JMS efficiently bạn nên cố gắng tránh tạo ra người tiêu dùng đối với một thông điệp duy nhất, vv

Cũng vì API JMS là như vậy rất phức tạp để sử dụng một cách chính xác và hiệu quả - đặc biệt là với tổng hợp, giao dịch và xử lý đồng thời - Tôi khuyên bạn nên folks hide the middleware from their application code như qua sử dụng Apache Camel's Spring Remoting implementation for JMS

+0

Tôi đã tự cứu mình rất nhiều bánh xe, tôi đã biết về Camel cách đây vài năm. –

0
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver receiver = session.createReceiver(queue, filter); 

Ở đây người nhận sẽ nhận được những thông điệp mà JMSCorrelationID bằng MessageID. điều này rất hữu ích trong mô hình yêu cầu/đáp ứng.

hoặc bạn có thể trực tiếp thiết lập này cho bất kỳ giá trị:

QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";); 

hơn bạn có thể làm một trong hai Hope receiver.receive(2000); hoặc receiver.setMessageListener(this);

2

điều này sẽ giúp. Tôi đã sử dụng Open MQ.

package com.MQueues; 

import java.util.UUID; 

import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import com.sun.messaging.BasicQueue; 
import com.sun.messaging.QueueConnectionFactory; 

public class HelloProducerConsumer { 

public static String queueName = "queue0"; 
public static String correlationId; 

public static String getCorrelationId() { 
    return correlationId; 
} 

public static void setCorrelationId(String correlationId) { 
    HelloProducerConsumer.correlationId = correlationId; 
} 

public static String getQueueName() { 
    return queueName; 
} 

public static void sendMessage(String threadName) { 
    correlationId = UUID.randomUUID().toString(); 
    try { 

     // Start connection 
     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
     BasicQueue destination = (BasicQueue) session.createQueue(threadName); 
     MessageProducer producer = session.createProducer(destination); 
     connection.start(); 

     // create message to send 
     TextMessage message = session.createTextMessage(); 
     message.setJMSCorrelationID(correlationId); 
     message.setText(threadName + "(" + System.currentTimeMillis() 
       + ") " + correlationId +" from Producer"); 

     System.out.println(correlationId +" Send from Producer"); 
     producer.send(message); 

     // close everything 
     producer.close(); 
     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void receivemessage(final String correlationId) { 
    try { 

     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

     BasicQueue destination = (BasicQueue) session.createQueue(getQueueName()); 

     connection.start(); 

     System.out.println("\n"); 
     System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 
     long now = System.currentTimeMillis(); 

     // receive our message 
     String filter = "JMSCorrelationID = '" + correlationId + "'"; 
     QueueReceiver receiver = session.createReceiver(destination, filter); 
     TextMessage m = (TextMessage) receiver.receive(); 
     System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp()); 

     System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 

     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void main(String args[]) { 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId1 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId2 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId3 = getCorrelationId(); 


    HelloProducerConsumer.receivemessage(correlationId2); 

    HelloProducerConsumer.receivemessage(correlationId1); 

    HelloProducerConsumer.receivemessage(correlationId3); 
} 
} 
Các vấn đề liên quan