2011-06-23 32 views
7

Tôi mới đến HornetQ vì vậy hãy chịu với tôi. Trước tiên, hãy để tôi cho bạn biết các yêu cầu của mình:Tin nhắn HornetQ vẫn còn trong hàng đợi sau khi sử dụng lõi api

Tôi cần một thông báo xếp hàng trung gian có thể truyền tin nhắn, có kích thước khoảng 1k, giữa quá trình khác nhau với độ trễ thấp và độ bền thấp (tức là nó sẽ tồn tại treo hệ thống). Tôi sẽ có nhiều quy trình bằng văn bản cho cùng một hàng đợi và tương tự như nhiều quá trình đọc từ cùng một hàng đợi.

Đối với điều này tôi đã chọn HornetQ vì nó có xếp hạng tốt nhất cho thông điệp đi qua với sự kiên trì.

Tôi hiện đang usung Hornetq v2.2.2Final như đứng một mình máy chủ.
tôi có thể thành công tạo/hàng đợi không bền bền sử dụng lõi api(ClientSession), và thành công gửi thông điệp đến xếp hàng (ClientProducer).
Tương tự, tôi có thể đọc các tin nhắn từ hàng đợi bằng cách sử dụng lõi api (ClientConsumer).

Sự cố xảy ra sau khi ứng dụng khách đã đọc tin nhắn, tin nhắn vẫn còn trong hàng đợi, tức là số số lượng tin nhắn trong hàng đợi vẫn không đổi. Có lẽ tôi nhận được điều này sai nhưng tôi đã theo ấn tượng này rằng một khi tin nhắn được tiêu thụ (đọc + ack), nó được loại bỏ khỏi hàng đợi.Bằng điều này không xảy ra trong trường hợp của tôi, và các thông điệp tương tự đang được đọc đi đọc lại.

Ngoài ra, tôi muốn nói rằng tôi đã thử sử dụng hàng đợi không bền với các thông báo không bền. nhưng vấn đề vẫn là.

Mã cho sản xuất mà tôi đang sử dụng:

public class HQProducer implements Runnable { 

    private ClientProducer producer; 
    private boolean killme; 
    private ClientSession session; 
    private boolean durableMsg; 

    public HQProducer(String host, int port, String address, String queueName, 
      boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { 
     this.durableMsg = durableMsg; 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      if (queueExists(queueName)) { 
       if (deleteQ) { 
        System.out.println("Deleting existing queue :: " + queueName); 
        session.deleteQueue(queueName); 
        System.out.println("Creating queue :: " + queueName); 
        session.createQueue(address, queueName, true); 
       } 
      } else { 
       System.out.println("Creating new queue :: " + queueName); 
       session.createQueue(address, queueName, durable); 
      } 
      producer = session.createProducer(SimpleString.toSimpleString(address), pRate); 

      killme = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killme) { 
      try { 
       ClientMessage message = session.createMessage(durableMsg); 

       message.getBodyBuffer().writeString("Hello world"); 

       producer.send(message); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("Producer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killme) { 
     this.killme = killme; 
    } 

    private boolean queueExists(String qname) { 
     boolean res = false; 
     try { 
      //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); 
      QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); 
      if (queueQuery.isExists()) { 
       res = true; 
      } 
     } catch (HornetQException ex) { 
      res = false; 
     } 
     return res; 
    } 
} 

Ngoài ra mã cho người tiêu dùng là:

public class HQConsumer implements Runnable { 

    private ClientSession session; 
    private ClientConsumer consumer; 
    private boolean killMe; 

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) { 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      session.start(); 

      consumer = session.createConsumer(queueName, "",0,-1,browseOnly); 

      killMe = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killMe) { 
      try { 
       ClientMessage msgReceived = consumer.receive(); 
       msgReceived.acknowledge(); 
       //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("ConSumer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killMe) { 
     this.killMe = killMe; 
    } 
} 

HornetQ máy chủ cấu hình ::

<configuration xmlns="urn:hornetq" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> 

    <paging-directory>${data.dir:../data}/paging</paging-directory> 

    <bindings-directory>${data.dir:../data}/bindings</bindings-directory> 

    <journal-directory>${data.dir:../data}/journal</journal-directory> 

    <journal-min-files>10</journal-min-files> 

    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> 

    <connectors> 
     <connector name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </connector> 

     <connector name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     </connector> 
    </connectors> 

    <acceptors> 
     <acceptor name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </acceptor> 

     <acceptor name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     <param key="direct-deliver" value="false"/> 
     </acceptor> 
    </acceptors> 

    <security-settings> 
     <security-setting match="#"> 
     <permission type="createNonDurableQueue" roles="guest"/> 
     <permission type="deleteNonDurableQueue" roles="guest"/> 
     <permission type="createDurableQueue" roles="guest"/> 
     <permission type="deleteDurableQueue" roles="guest"/> 
     <permission type="consume" roles="guest"/> 
     <permission type="send" roles="guest"/> 
     </security-setting> 
    </security-settings> 

    <address-settings> 
     <!--default for catch all--> 
     <address-setting match="#"> 
     <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
     <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
     <redelivery-delay>0</redelivery-delay> 
     <max-size-bytes>10485760</max-size-bytes>  
     <message-counter-history-day-limit>10</message-counter-history-day-limit> 
     <address-full-policy>BLOCK</address-full-policy> 
     </address-setting> 
    </address-settings> 

</configuration> 
+0

a/c tới [này] (http: // docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354) bạn cần phải xác nhận thông báo sau khi xử lý bạn đang làm như vậy? –

Trả lời

13

Với api hornetq lõi bạn phải nhắn tin một cách rõ ràng. Tôi không nhìn thấy nơi mà đang xảy ra trong thử nghiệm của bạn.

Nếu bạn không cố định, đây là lý do tại sao thư của bạn bị chặn. Tôi sẽ cần phải xem ví dụ hoàn chỉnh của bạn để cung cấp cho bạn một câu trả lời hoàn chỉnh.

Ngoài ra: Bạn nên xác định createSession của bạn với: createSession (true, true, 0)

API lõi có một tùy chọn để ACK hàng loạt. Bạn không sử dụng phiên giao dịch, vì vậy bạn sẽ không gửi acks đến máy chủ cho đến khi bạn đạt được ackBatchSize được cấu hình tại máy chủ của bạnLocator. Với điều này tại chỗ, bất kỳ ack sẽ được gửi đến máy chủ ngay sau khi bạn gọi xác nhận() tại thông điệp của bạn.

Tùy chọn bạn hiện đang sử dụng tương đương với JMS DUPS_OK ​​với một DUPS_SIZE nhất định.

(bài viết thay đổi nội dung câu trả lời ban đầu của tôi sau một số lần lặp với bạn)

+1

'ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); '.. Tôi thừa nhận mã số –

+0

API lõi có tùy chọn cho các đợt ACK. Bạn không sử dụng phiên giao dịch, vì vậy bạn sẽ không gửi acks đến máy chủ cho đến khi bạn đạt được ackBatchSize được cấu hình tại máy chủ của bạnLocator. Bạn nên xác định createSession của mình với: createSession (true, true, 0); Với điều này tại chỗ, bất kỳ ack nào sẽ được gửi đến máy chủ ngay sau khi bạn gọi xác nhận() tại thông báo của bạn –

+1

Bạn không quay lại chủ đề này. Vì vậy, tôi giả sử bạn nhận được vấn đề của bạn cố định? –

2

Thiết lập ackbatchsize giúp tôi khắc phục vấn đề .. Thanks for the help

+2

bạn có lẽ nên bỏ phiếu ở một trong những câu trả lời ở đây. –

Các vấn đề liên quan