Theo tài liệu được cung cấp here, tôi đang thử POC để nhận tin nhắn vào người nghe như được đề cập trong same documentation, Dưới đây là cách tôi đã viết cấu hình.Spring Integration Kafka Consumer Listener không nhận được tin nhắn
@Configuration
public class KafkaConsumerConfig {
public static final String TEST_TOPIC_ID = "record-stream";
@Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
private String topic;
@Value("${kafka.address:localhost:9092}")
private String brokerAddress;
/*
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String>
kafkaMessageDrivenChannelAdapter = new
KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
kafkaMessageDrivenChannelAdapter; }
@Bean public QueueChannel received() { return new QueueChannel(); }
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
/*
* @Bean public KafkaMessageListenerContainer<String, String> container()
* throws Exception { ContainerProperties properties = new
* ContainerProperties(this.topic); // set more properties return new
* KafkaMessageListenerContainer<>(consumerFactory(), properties); }
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest
// smallest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
và Listener là như dưới đây,
@Service
public class Listener {
private Logger log = Logger.getLogger(Listener.class);
@KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory")
public void process(String message/* , Acknowledgment ack */) {
Gson gson = new Gson();
Record record = gson.fromJson(message, Record.class);
log.info(record.getId() + " " + record.getName());
// ack.acknowledge();
}
}
Mặc dù tôi đang sản xuất các tin nhắn đến cùng một chủ đề và tiêu dùng này đang làm việc trên cùng một chủ đề, Listener không thực hiện.
Tôi đang chạy Kafka 0.10.0.1 và đây là pom hiện tại của tôi. Người tiêu dùng này đang làm việc như một ứng dụng web khởi động mùa xuân không giống như nhiều dòng lệnh mẫu.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Tôi đã dành nhiều thời gian để tìm ra lý do tại sao người nghe này không bị ảnh hưởng khi chủ đề có tin nhắn, tôi đang làm gì sai.
Tôi biết rằng tôi có thể nhận tin nhắn bằng kênh (Tôi đã nhận xét phần cấu hình của điều đó trong mã), Nhưng tại đây đồng thời được xử lý sạch sẽ.
Loại triển khai này có thể thực hiện với mức tiêu thụ tin nhắn không đồng bộ hay không.
Tôi nghĩ EnableIntegration đang chăm sóc cho rằng, trong sản xuất của tôi, tôi không có EnableKafka. Hãy để tôi thử điều này và quay lại với bạn –
'@ EnableIntegration' dành cho Spring Integration, nhưng chúng ta nói ở đây về Spring Kafka. Họ là những dự án độc lập hoàn toàn khác nhau. Nhìn '' KafkaListener' nằm ngoài Spring Integration Kafka. Cũng giống như nhớ có '@ EnableJms',' @ EnableRabbit' và nhiều '@Enable ...' khác. Chỉ '@ EnableIntegration' không thể quan tâm đến tất cả chúng. Đó không phải là trách nhiệm của nó. –
Cảm ơn rất nhiều vì cái nhìn sâu sắc, bạn đã làm việc như một sự quyến rũ. Sẽ ghi nhớ điều này. –