2016-10-06 25 views
5

Theo tài liệu được cung cấp here, tôi đang thử POC để nhận tin nhắn vào người nghe như được đề cập trong same documentation, Dưới đây là cách tôi đã viết cấu hình.Spring Integration Kafka Consumer Listener không nhận được tin nhắn

@Configuration 
public class KafkaConsumerConfig { 

    public static final String TEST_TOPIC_ID = "record-stream"; 

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") 
    private String topic; 

    @Value("${kafka.address:localhost:9092}") 
    private String brokerAddress; 


    /* 
     @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
     KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> 
     kafkaMessageDrivenChannelAdapter = new 
     KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return 
     kafkaMessageDrivenChannelAdapter; } 

     @Bean public QueueChannel received() { return new QueueChannel(); } 
    */ 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(3); 
     factory.getContainerProperties().setPollTimeout(30000); 
     return factory; 

    } 

    /* 
    * @Bean public KafkaMessageListenerContainer<String, String> container() 
    * throws Exception { ContainerProperties properties = new 
    * ContainerProperties(this.topic); // set more properties return new 
    * KafkaMessageListenerContainer<>(consumerFactory(), properties); } 
    */ 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest 
                     // smallest 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

} 

và Listener là như dưới đây,

@Service 
public class Listener { 

    private Logger log = Logger.getLogger(Listener.class); 


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") 
    public void process(String message/* , Acknowledgment ack */) { 
     Gson gson = new Gson(); 
     Record record = gson.fromJson(message, Record.class); 

     log.info(record.getId() + " " + record.getName()); 
     // ack.acknowledge(); 
    } 

} 

Mặc dù tôi đang sản xuất các tin nhắn đến cùng một chủ đề và tiêu dùng này đang làm việc trên cùng một chủ đề, Listener không thực hiện.

Tôi đang chạy Kafka 0.10.0.1 và đây là pom hiện tại của tôi. Người tiêu dùng này đang làm việc như một ứng dụng web khởi động mùa xuân không giống như nhiều dòng lệnh mẫu.

<dependencies> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-kafka</artifactId> 
      <version>2.1.0.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>com.google.code.gson</groupId> 
      <artifactId>gson</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

    </dependencies> 

Tôi đã dành nhiều thời gian để tìm ra lý do tại sao người nghe này không bị ảnh hưởng khi chủ đề có tin nhắn, tôi đang làm gì sai.

Tôi biết rằng tôi có thể nhận tin nhắn bằng kênh (Tôi đã nhận xét phần cấu hình của điều đó trong mã), Nhưng tại đây đồng thời được xử lý sạch sẽ.

Loại triển khai này có thể thực hiện với mức tiêu thụ tin nhắn không đồng bộ hay không.

Trả lời

3

Bạn phải thêm @EnableKafka cùng với @Configuration.

Sẽ sớm add một số mô tả.

Trong khi đó:

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
+0

Tôi nghĩ EnableIntegration đang chăm sóc cho rằng, trong sản xuất của tôi, tôi không có EnableKafka. Hãy để tôi thử điều này và quay lại với bạn –

+0

'@ EnableIntegration' dành cho Spring Integration, nhưng chúng ta nói ở đây về Spring Kafka. Họ là những dự án độc lập hoàn toàn khác nhau. Nhìn '' KafkaListener' nằm ngoài Spring Integration Kafka. Cũng giống như nhớ có '@ EnableJms',' @ EnableRabbit' và nhiều '@Enable ...' khác. Chỉ '@ EnableIntegration' không thể quan tâm đến tất cả chúng. Đó không phải là trách nhiệm của nó. –

+0

Cảm ơn rất nhiều vì cái nhìn sâu sắc, bạn đã làm việc như một sự quyến rũ. Sẽ ghi nhớ điều này. –

Các vấn đề liên quan