Tôi có một đơn giản Kafka tiêu dùng trong java với đoạn mã sauKafka Consumer treo tại .hasNext trong java
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()&& !done){
try {
System.out.println("Parsing data");
byte[] data = it.next().message();
System.out.println("Found data: "+data);
values.add(data); // array list
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
done = true;
}
Khi một thông điệp được đăng tải, dữ liệu được đọc thành công, tuy nhiên khi nó đi lại để kiểm tra nó .hasNext(), nó vẫn đang chờ xử lý và không bao giờ quay lại.
Điều gì có thể trì hoãn điều này?
m_stream là một KafkaStream thu được như sau:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
// m_stream is one of these streams
}