2015-05-19 12 views
5

Tôi đang cố di chuyển từ cấu hình XML amqp mùa xuân sang chú thích java dựa trên chú thích vì nó đơn giản hơn. Không chắc chắn những gì tôi đang làm sai cấu hình XML hoạt động tốt nhưng java @Configurable ném một "Gây ra bởi: java.net.SocketException: kết nối thiết lập lại" ngoại lệ.Cố gắng di chuyển qua từ <rabbit:> cấu hình không gian tên xml vào java @Configurable không thể sao chép

XML cấu hình (chỉ hoạt động một cách hoàn hảo):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xmlns:util="http://www.springframework.org/schema/util" 
     xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
      http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
      http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/util 
      http://www.springframework.org/schema/util/spring-util.xsd 
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <!-- define which properties files will be used --> 
    <context:property-placeholder location="classpath:*.properties" /> 

    <rabbit:connection-factory id="connectionFactory" 
           addresses='${rabbitmq.hostname}' 
           username='${rabbitmq.username}' 
           password='${rabbitmq.password}' 
           virtual-host='${rabbitmq.virtual_host}' 
           cache-mode='${rabbitmq.cache_mode}'        
           channel-cache-size='${rabbitmq.channel_cache_size}'/> 

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="3"/> 
     <property name="maxPoolSize" value="5"/> 
     <property name="queueCapacity" value="15"/>       
    </bean>        


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> 
    <rabbit:admin connection-factory="connectionFactory"/> 
     <rabbit:queue name="${rabbitmq.queue_name}" /> 
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}"> 
    <rabbit:bindings> 
     <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/> 


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor"> 
      <rabbit:listener ref="listener" queues="notification.main" /> 

    </rabbit:listener-container> 
</beans> 

Java cấu hình:

@Configurable 
@PropertySource("classpath:rabbitmq.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.hostname}") 
private String hostname; 

@Value("${rabbitmq.port}") 
private String port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.virtual_host}") 
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}") 
//private String cacheMode; 

@Value("${rabbitmq.channel_cache_size}") 
private String channelCacheSize; 

@Value("${rabbitmq.topic_exchange_name}") 
private String topicExchangeName; 

@Value("${rabbitmq.topic_exchange_pattern}") 
private String topicExchangePattern; 

@Value("${rabbitmq.queue_name}") 
private String queueName; 

@Autowired 
private ConnectionFactory cachingConnectionFactory; 

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

    connectionFactory.setUsername(username); 
    connectionFactory.setPassword(password); 

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode)); 
    connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

    return connectionFactory; 
} 

@Bean(name="taskExecutor") 
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor(); 
    tpte.setCorePoolSize(3); 
    tpte.setMaxPoolSize(5); 
    tpte.setQueueCapacity(15); 
    return tpte; 
} 

@Bean 
public AmqpTemplate AmqpTemplate() { 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 

    return template; 
} 


@Bean 
public AmqpAdmin amqpAdmin() { 
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory); 

    return amqpAdmin; 
} 

@Bean 
public Queue queue() { 
    return new Queue(queueName); 
} 

@Bean 
public TopicExchange topicExchange() { 
    TopicExchange topicExchange = new TopicExchange(topicExchangeName); 
    return topicExchange; 
} 

@Bean 
public Binding dataBinding(TopicExchange topicExchange, Queue queue) { 
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern); 
} 

@Bean 
public DefaultMessageListener defaultMessageListener() { 
    return new DefaultMessageListener(); 
} 

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(cachingConnectionFactory); 
    container.setQueueNames(queueName); 
    container.setAutoStartup(true); 
    container.setMessageListener(defaultMessageListener); 
    //container.setTaskExecutor(taskExecutor); 
    return container; 
} 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

java cấu hình lỗi:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean 
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container. 
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s). 
org.springframework.amqp.AmqpIOException: java.io.IOException 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) 
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004) 
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637) 
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208) 
    ... 12 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 
    ... 16 more 
Caused by: java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) 
    ... 1 more 

Tôi bước vào mã amqp Spring và thủ phạm là phương thức RabbitAdmin # getQueueProperties. Trong cấu hình XML nó thực hiện tốt ... nhưng ngay sau khi nó thực hiện với cấu hình java nó ném ngoại lệ trên? Những gì tôi đang làm là khác nhau? Cả hai cấu hình đều giống với tôi.

package org.springframework.amqp.rabbit.core; 

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean { 
//... 
    @Override 
    public Properties getQueueProperties(final String queueName) { 
     Assert.hasText(queueName, "'queueName' cannot be null or empty"); 
     return this.rabbitTemplate.execute(new ChannelCallback<Properties>() { 
      @Override 
      public Properties doInRabbit(Channel channel) throws Exception { 
       try { 
        DeclareOk declareOk = channel.queueDeclarePassive(queueName); 
        Properties props = new Properties(); 
        props.put(QUEUE_NAME, declareOk.getQueue()); 
        props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount()); 
        props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount()); 
        return props; 
       } 
       catch (Exception e) { 
        if (logger.isDebugEnabled()) { 
         logger.debug("Queue '" + queueName + "' does not exist"); 
        } 
        return null; 
       } 
      } 
     }); 
    } 
} 

Cả hai cấu hình đều sử dụng cùng một tập tin rabbitmq.properties trên đường dẫn lớp. Tôi thậm chí đã kiểm tra các thuộc tính của các lớp RabbitAdmin và RabbitTemplate khi chạy cho cả hai cấu hình và chúng trông giống hệt nhau ...

Trả lời

1

Tôi không sử dụng máy chủ ảo gốc '/'. Tôi đã có giá trị tùy chỉnh của riêng mình cho virtual_host. Mặc dù, tôi đã tiêm thuộc tính này thông qua trò chơi vào cấu hình java của tôi, tôi đã không đặt rõ ràng nó trên connectionFactory.

connectionFactory.setVirtualHost(virtualHost); 

Nhờ @Gary Russell đã giúp tôi giải quyết sự cố.

@Bean(name="cachingConnectionFactory") 
public ConnectionFactory connectionFactory() { 

     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port)); 

     connectionFactory.setUsername(username); 
     connectionFactory.setPassword(password); 

     connectionFactory.setVirtualHost(virtualHost); 
     connectionFactory.setChannelCacheSize(Integer.valueOf(channelCacheSize)); 

     return connectionFactory; 
    } 
4

Bạn nên sử dụng @Configuration, không phải @Configurable.

EDIT:

Hình như RabbitMQ-server được đóng kết nối:

Caused by: java.net.SocketException: Connection reset 

Look trong nhật ký máy chủ; nếu điều đó không giúp được gì; đăng nhật ký DEBUG đầy đủ cho org.springframework ở đâu đó (có thể quá lớn đối với ở đây).

EDIT2:

Bạn có một vấn đề xác thực ...

{handshake_error,opening,0, 
      {amqp_error,access_refused, 
         "access to vhost '/' refused for user 'gggdw'", 
         'connection.open'}} 

... kiểm tra username và pasword (và vhost) của bạn.

+0

điểm công bằng nhưng vẫn nhận được lỗi tương tự – Selwyn

+0

Russel http://pastebin.com/p0KzJv3g Full chồng không phải là rất hở hang (với tôi) tôi bước vào mã mùa xuân AMQP và RabbitAdmin # getQueueProperties là phương pháp mà ném một ngoại lệ trong cấu hình java nhưng thực hiện tốt trong cấu hình XML – Selwyn

+1

Một số vấn đề với thông tin đăng nhập. –

Các vấn đề liên quan