2015-02-11 16 views
9

Tôi có một đơn giản Kafka tiêu dùng trong java với đoạn mã sauKafka Consumer treo tại .hasNext trong java

public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()&& !done){ 
      try { 
       System.out.println("Parsing data"); 
       byte[] data = it.next().message(); 
       System.out.println("Found data: "+data); 
       values.add(data); // array list 
      } catch (InvalidProtocolBufferException e) { 
       e.printStackTrace(); 
      } 
     } 
     done = true; 
    } 

Khi một thông điệp được đăng tải, dữ liệu được đọc thành công, tuy nhiên khi nó đi lại để kiểm tra nó .hasNext(), nó vẫn đang chờ xử lý và không bao giờ quay lại.

Điều gì có thể trì hoãn điều này?

m_stream là một KafkaStream thu được như sau:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
executor = Executors.newFixedThreadPool(a_numThreads); 
for (final KafkaStream stream : streams) { 
    // m_stream is one of these streams 
} 

Trả lời

11

Giải pháp là thêm thuộc tính

"consumer.timeout.ms"

Bây giờ khi thời gian chờ đạt được một ConsumerTimeoutException là ném

Các vấn đề liên quan