2015-04-28 14 views
5

Tôi đang cố định cấu hình JMS bằng Spring Integration và ActiveMQ Message Broker. Kênh đi của tôi phải được một Cửa hàng Thư của JDBC hỗ trợ để tránh mất dữ liệu, ví dụ: người môi giới hoặc ứng dụng của tôi sẽ ngoại tuyến.Tích hợp mùa xuân với JMS + ActiveMQ: Thư vẫn còn trong Cửa hàng thư JDBC sau khi kết nối lại

Cấu hình của tôi dường như hoạt động cho đến nay, tuy nhiên Cửa hàng thư JDBC không hoạt động như tôi mong đợi. Nếu tôi ngắt kết nối với nhà môi giới, các tin nhắn gửi đến kênh đi sẽ được lưu giữ như dự định nhưng sau khi kết nối lại chúng vẫn còn trong DB và không được gửi đến hàng đợi. Tuy nhiên, các thông điệp hơn nữa tôi gửi sau khi kết nối lại đạt đến hàng đợi và nếu tôi khởi động lại ứng dụng của tôi các thông điệp tiếp tục tồn tại được gửi cuối cùng cũng ...

ứng dụng context.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:jms="http://www.springframework.org/schema/jms" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:oxm="http://www.springframework.org/schema/oxm" 
    xmlns:j2ee="http://www.springframework.org/schema/jee" 
    xmlns:amq="http://activemq.apache.org/schema/core" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd 
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd 
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd 
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

<!-- ActiveMQ Configuration --> 
<jms:annotation-driven/> 
<int:annotation-config/> 

<!-- Factory for ActiveMQ connections from JNDI --> 
<j2ee:jndi-lookup jndi-name="java:comp/env/jms/connectionFactory" 
        id="jmsConnectionFactory" 
        expected-type="org.apache.activemq.ActiveMQConnectionFactory" 
        proxy-interface="javax.jms.ConnectionFactory" 
        lookup-on-startup="false" 
        resource-ref="true" 
        cache="true"/> 


<bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> 
    <property name="connectionFactory" ref="jmsConnectionFactory"/> 
    <property name="maxConnections" value="8"/> 
    <property name="reconnectOnException" value="true"/> 
</bean> 


<!-- JAXB-marshaller from spring-oxm used to handle XML-payload of messages --> 
<oxm:jaxb2-marshaller id="jax2bMarshaller" context-path="de.br.ecomx"/> 

<!-- message-converter --> 
<bean id="messageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter"> 
    <property name="marshaller" ref="jax2bMarshaller"/> 
    <property name="unmarshaller" ref="jax2bMarshaller"/> 
</bean> 

<!-- Message Store implementation --> 
<!-- Persists messages temporariliy in DB to prevent data loss on server-shutdown --> 
<!-- Has to be injected specifically in Queue Channels --> 
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> 
    <property name="dataSource" ref="dataSource"/> 
    <property name="channelMessageStoreQueryProvider"> 
     <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider"/> 
    </property> 
    <property name="usingIdCache" value="true"/> 
    <property name="region" value="BWH"/> 
    <property name="tablePrefix" value="BWH_"/> 
</bean> 

<!-- default poller for (non-message-driven) channel-adapters --> 
<int:poller id="defaultPoller" default="true" fixed-delay="500"> 
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" transaction-manager="transactionManager"/> 
</int:poller> 

<!-- LOGGING --> 
<!--<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true"/>--> 


<!-- CHANNEL DEFINITIONS --> 

<!-- intermediate channel for message enrichment --> 
<int:channel id="output_enricher_channel"><int:queue/></int:channel> 

<!-- general output-channel - specific channel is choosed by a router --> 
<int:channel id="output_router"><int:queue/></int:channel> 

<!-- channel definitions connected to consumer queues --> 
<int:channel id="bwh_articleMasterData_input_channel"><int:queue/></int:channel> 
<int:channel id="bwh_transferOrder_input_channel"><int:queue/></int:channel> 
<!-- add your channel here... --> 

<!-- channel definitions connected to producer queues --> 
<int:channel id="ax_transferOrderPacked_input_channel"><int:queue message-store="store"/></int:channel> 
<!-- add your channel here... --> 


<!-- MESSAGE HANDLING --> 

<!-- Enriches the 'messageId'-property of payload with 'id'-value from header and sends them to router --> 
<int:enricher input-channel="output_enricher_channel" output-channel="output_router"> 
    <int:property name="messageId" expression="headers.id"/> 
</int:enricher> 

<!-- routing of outgoing messages to their type-specific channel --> 
<int:payload-type-router input-channel="output_router"> 
    <int:mapping type="de.br.ecomx.TransferOrdersPacked" channel="ax_transferOrderPacked_input_channel"/> 
</int:payload-type-router> 


<!-- ENDPOINTS --> 

<!-- endpoint configurations connecting queues with channels --> 
<int-jms:message-driven-channel-adapter id="articleMasterDataConsumerChannelAdapter" connection-factory="connectionFactory" 
             destination-name="${queue.bwh.articlemasterdata.in}" channel="bwh_articleMasterData_input_channel" 
             message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="10" 
             auto-startup="false"/> 
<int-jms:message-driven-channel-adapter id="transferOrderConsumerChannelAdapter" connection-factory="connectionFactory" 
             destination-name="${queue.bwh.transferorder.in}" channel="bwh_transferOrder_input_channel" 
             message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="5" 
             auto-startup="false"/> 
<int-jms:outbound-channel-adapter id="transferOrderPackedProducerChannelAdapter" connection-factory="connectionFactory" 
            destination-name="${queue.ax.transferorderpacked.in}" channel="ax_transferOrderPacked_input_channel" 
            message-converter="messageConverter" delivery-persistent="true" session-transacted="true" 
            auto-startup="false"/> 

<!-- add your endpoint here... --> 
<!-- REMEMBER: If auto-startup is set to false (which is recommended for all channel adapters) --> 
<!--   the channel adapter has to be registered in ecomxConsumerChannelAdapterContainer below!!! --> 


<!-- gateway definitions usually injected by producer-services and used to send messages to an outbound-channel-adapter --> 
<int:gateway default-request-channel="output_enricher_channel" service-interface="de.lo.bwh.jms.gateway.EcomxProducerGateway"/> 

<!-- connect endpoints with services --> 
<int:service-activator input-channel="bwh_articleMasterData_input_channel" ref="ecomxArticleMasterDataConsumer"/> 
<int:service-activator input-channel="bwh_transferOrder_input_channel" ref="ecomxTransferOrderConsumer"/> 


<!-- container for endpoints used for start/stop all registered consumers/producers at once --> 
<bean id="ecomxConsumerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxConsumerChannelAdapterContainer"> 
    <property name="endpoints"> 
     <list value-type="org.springframework.integration.jms.JmsMessageDrivenEndpoint"> 
      <ref bean="articleMasterDataConsumerChannelAdapter"/> 
      <ref bean="transferOrderConsumerChannelAdapter"/> 
     </list> 
    </property> 
</bean> 

<bean id="ecomxProducerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxProducerChannelAdapterContainer"> 
    <property name="endpoints"> 
     <list value-type="org.springframework.integration.endpoint.PollingConsumer"> 
      <ref bean="transferOrderPackedProducerChannelAdapter"/> 
     </list> 
    </property> 
</bean> 

</beans> 

context.xml

<Resource name="jms/connectionFactory" 
     auth="Container" 
     type="org.apache.activemq.ActiveMQConnectionFactory" 
     description="JMS Connection Factory" 
     factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
     brokerURL="tcp://my.url.net:61616" 
     brokerName="myActiveMQBroker"/> 

Tôi đang sử dụng EcomxProducerGateway để gửi tin nhắn. Nó sẽ gửi các tin nhắn đến output_enricher_channel để chuyển tiếp chúng đến output_router sẽ định tuyến thông báo đến kênh đi chính xác theo loại tải trọng.

Khi ngắt kết nối nhà môi giới, một số ngoại lệ được ghi lại sau vài giây. Stacktrace sau đây có thể hữu ích:

[WARN 08:37:07] SingleConnectionFactory.onException(322) | Encountered a JMSException - resetting the underlying JMS Connection 
javax.jms.JMSException: Operation timed out 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) 
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1997) 
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:2016) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160) 
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314) 
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:200) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketException: Operation timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594) 
at java.io.DataInputStream.readInt(DataInputStream.java:387) 
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258) 
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) 
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) 
... 1 more 

[ERROR 08:37:07] LoggingHandler.handleMessageInternal(145) | org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.JMSException: Operation timed out 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:424) 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404) 
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:85) 
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133) 
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:954) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:515) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291) 
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
at com.sun.proxy.$Proxy484.call(Unknown Source) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) 
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: javax.jms.JMSException: Operation timed out 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) 
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420) 
at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761) 
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:327) 
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:574) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:384) 
at com.sun.proxy.$Proxy602.commit(Unknown Source) 
at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:184) 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:421) 
... 27 more 
Caused by: java.net.SocketException: Operation timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594) 
at java.io.DataInputStream.readInt(DataInputStream.java:387) 
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258) 
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) 
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) 
... 1 more 

[ERROR 08:37:08] LoggingHandler.handleMessageInternal(145) | org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler#0]; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) 
at sun.reflect.GeneratedMethodAccessor464.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) 
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
at com.sun.proxy.$Proxy484.call(Unknown Source) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) 
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) 
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) 
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496) 
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:579) 
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:705) 
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:145) 
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:115) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
... 31 more 
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:360) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:305) 
at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:245) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201) 
at com.sun.proxy.$Proxy156.createConnection(Unknown Source) 
at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:365) 
at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:305) 
at org.springframework.jms.connection.SingleConnectionFactory.getConnection(SingleConnectionFactory.java:283) 
at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:224) 
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180) 
at org.springframework.jms.core.JmsTemplate.access$600(JmsTemplate.java:90) 
at org.springframework.jms.core.JmsTemplate$JmsTemplateResourceFactory.createConnection(JmsTemplate.java:1205) 
at org.springframework.jms.connection.ConnectionFactoryUtils.doGetTransactionalSession(ConnectionFactoryUtils.java:312) 
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:480) 
... 36 more 
Caused by: java.net.UnknownHostException: my.url.net 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:501) 
at org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:464) 
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55) 
at org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:138) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:340) 
... 54 more 

Trả lời

3

Vâng, thao tác này không hoạt động đối với thư được khôi phục vì usingIdCache="true". Thông báo được trả lại cho DB, nhưng chúng không được chọn bởi poll tiếp theo, vì các id của chúng nằm trong bộ nhớ trong idCache.

Chúng tôi cung cấp đề nghị này để có được thỏa thuận với những thông điệp xử lý hoặc cuộn lại:

<int:transaction-synchronization-factory id="syncFactory"> 
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> 
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory> 

... 

<int:poller id="defaultPoller" default="true" fixed-delay="500"> 
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" 
         synchronization-factory="syncFactory" 
         transaction-manager="transactionManager"/> 
</int:poller> 

Với điều đó một cách độc lập của TX nhắn kết quả id được lấy ra từ các idCache. Và trong trường hợp của bạn, nó sẽ sẵn sàng để được thăm dò thêm một lần nữa.

Xem thêm thông tin trong Reference Manual. Và JavaDocs trên JdbcChannelMessageStore#setUsingIdCache hiển thị giống nhau.

+0

Hoạt động, cảm ơn bạn! –

Các vấn đề liên quan