2015-06-11 20 views
9

Tôi đang cố gắng tạo một ứng dụng khởi động mùa xuân đơn giản với khởi động mùa xuân để "sản xuất" tin nhắn đến một chuỗi/hàng đợi thỏmq và một ứng dụng khởi động mẫu khác "tiêu thụ" những tin nhắn này. Vì vậy, tôi có hai ứng dụng (hoặc microservices nếu bạn muốn). 1) "nhà sản xuất" microservice 2) "người tiêu dùng" microservicespring boot rabbitmq MappingJackson2MessageConverter chuyển đổi đối tượng tùy chỉnh

"nhà sản xuất" có 2 đối tượng miền. Foo và Bar cần được chuyển thành json và gửi tới rabbitmq. "Người tiêu dùng" sẽ nhận và chuyển đổi thông điệp json thành miền Foo và Bar tương ứng. Vì một lý do nào đó tôi không thể thực hiện nhiệm vụ đơn giản này. Không có nhiều ví dụ về điều này. Đối với bộ chuyển đổi thông điệp mà tôi muốn sử dụng org.springframework.messaging.converter.MappingJackson2MessageConverter

Dưới đây là những gì tôi có cho đến nay:

NHÀ SẢN XUẤT MICROSERVICE

package demo.producer; 

import org.springframework.amqp.core.Binding; 
import org.springframework.amqp.core.BindingBuilder; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.core.TopicExchange; 
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
public class ProducerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ProducerApplication.class, args); 
    } 

    @Bean 
    Queue queue() { 
     return new Queue("queue", false); 
    } 

    @Bean 
    TopicExchange exchange() { 
     return new TopicExchange("exchange"); 
    } 

    @Bean 
    Binding binding(Queue queue, TopicExchange exchange) { 
     return BindingBuilder.bind(queue).to(exchange).with("queue"); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Autowired 
    private Sender sender; 

    @Override 
    public void run(String... args) throws Exception { 
     sender.sendToRabbitmq(new Foo(), new Bar()); 
    } 
} 

@Service 
class Sender { 

    @Autowired 
    private RabbitMessagingTemplate rabbitMessagingTemplate; 
    @Autowired 
    private MappingJackson2MessageConverter mappingJackson2MessageConverter; 

    public void sendToRabbitmq(final Foo foo, final Bar bar) { 

     this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter); 

     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo); 
     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar); 

    } 
} 

class Bar { 
    public int age = 33; 
} 

class Foo { 
    public String name = "gustavo"; 
} 

TIÊU DÙNG MICROSERVICE

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Autowired 
    private Receiver receiver; 

    @Override 
    public void run(String... args) throws Exception { 

    } 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

Và đây là ngoại lệ tôi nhận được:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 
Endpoint handler details: 
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)] 
Bean [[email protected]] 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message 
    ... 13 common frames omitted 
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}] 
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) 
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113) 
    ... 12 common frames omitted 

Trường hợp ngoại lệ nói không có chuyển đổi, và đó là sự thật, vấn đề của tôi là tôi không có ý tưởng làm thế nào để thiết lập các MappingJackson2MessageConverter chuyển đổi ở phía người tiêu dùng (xin lưu ý rằng tôi muốn sử dụng org.springframework .messaging.converter.MappingJackson2MessageConverter và không org.springframework.amqp.support.converter.JsonMessageConverter)

Bất kỳ suy nghĩ?

Chỉ trong trường hợp, bạn có thể ngã ba dự án mẫu này tại địa chỉ: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

+0

Hãy xem ở đây: http://stackoverflow.com/questions/29337550/using-rabbitlistener-with-jackson2jsonmessageconverter – stalet

+0

Trong đó dụ nó sử dụng ** org.springframework.amqp.support.converter.Jackson2JsonMessageConverter ** (mà thuộc về phụ thuộc spring-amqp), trong trường hợp của tôi, tôi muốn sử dụng ** org.springframework.messaging.converter.MappingJackson2MessageConverter ** (thuộc về tin nhắn mùa xuân). – Gustavo

Trả lời

12

Ok, cuối cùng tôi đã làm việc này.

Spring sử dụng một PayloadArgumentResolver để trích xuất, chuyển đổi và thiết lập các thông điệp chuyển đổi sang các tham số phương pháp chú thích với @RabbitListener. Bằng cách nào đó, chúng tôi cần đặt mappingJackson2MessageConverter vào đối tượng này.

Vì vậy, trong ứng dụng NGƯỜI TIÊU DÙNG, chúng tôi cần triển khai RabbitListenerConfigurer.Bằng cách ghi đè configureRabbitListeners (RabbitListenerEndpointRegistrar công ty đăng ký) chúng ta có thể thiết lập một tùy chỉnh DefaultMessageHandlerMethodFactory, để nhà máy này chúng tôi đặt bộ chuyển đổi tin nhắn, và nhà máy sẽ tạo của chúng tôi PayloadArgumentResolver với sự chuyển đổi chính xác.

Đây là đoạn mã, tôi cũng đã cập nhật git project.

ConsumerApplication.java

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; 
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements RabbitListenerConfigurer { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Bean 
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { 
     DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
     factory.setMessageConverter(jackson2Converter()); 
     return factory; 
    } 

    @Override 
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { 
     registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); 
    } 

    @Autowired 
    private Receiver receiver; 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

Vì vậy, nếu bạn chạy microservice Nhà sản xuất nó sẽ thêm 2 thông điệp trong hàng đợi. Một đại diện cho một đối tượng Foo và một đối tượng đại diện cho một đối tượng Bar. Bằng cách chạy dịch vụ microsd người tiêu dùng, bạn sẽ thấy cả hai được tiêu thụ theo phương pháp tương ứng trong lớp Máy thu.


vấn đề Cập nhật:

Có một vấn đề khái niệm về xếp hàng từ phía tôi, tôi nghĩ. Những gì tôi muốn đạt được không thể thực hiện được bằng cách khai báo 2 phương thức được chú thích với @RabbitListener trỏ đến cùng một hàng đợi. Giải pháp trên không hoạt động đúng cách. Nếu bạn gửi đến rabbitmq, giả sử, 6 tin nhắn Foo và 3 tin nhắn Bar, họ sẽ không nhận được 6 lần bởi người nghe với tham số Foo. Dường như người nghe được viện dẫn song song nên không có cách nào để phân biệt người nghe nào sẽ gọi dựa trên kiểu đối số của phương thức. Giải pháp của tôi (và tôi không chắc đây có phải là cách tốt nhất, tôi mở lời đề xuất ở đây) là tạo hàng đợi cho mỗi thực thể. Vì vậy, bây giờ, tôi có queue.barqueue.foo, và cập nhật @RabbitListener (hàng đợi = "queue.foo") Một lần nữa, tôi đã cập nhật mã và bạn có thể kiểm tra xem nó ra trong số git repository của tôi.

+1

1.5.0 (hiện tại là mốc 1) hỗ trợ [cấp lớp '@ RabbitListener'] (https://spring.io/blog/2015/05/08/spring-amqp-1-4-5-release-and -1-5-0-m1-available) cùng với '@ RabbitHandler' trên các phương thức riêng lẻ để hỗ trợ trường hợp sử dụng này. –

+0

đó là điều tốt đẹp để biết. cảm ơn ! – Gustavo

0

đã không làm điều này bản thân mình nhưng nó có vẻ như bạn cần phải đăng ký chuyển đổi thích hợp bằng cách thiết lập một RabbitTemplate. Hãy xem phần 3.6.2 trong this Spring documentation. Tôi biết nó được cấu hình bằng cách sử dụng các lớp AMQP nhưng nếu lớp nhắn tin bạn đang đề cập là tương thích thì không có lý do gì bạn không thể thay thế nó. Có vẻ như this reference giải thích cách bạn có thể làm điều đó bằng cách sử dụng cấu hình Java thay vì XML. Tôi đã không thực sự sử dụng Rabbit vì vậy tôi không có bất kỳ kinh nghiệm cá nhân nhưng tôi rất thích nghe những gì bạn tìm ra.

Các vấn đề liên quan