Cách đề nghị để làm điều này là phải có một hồ bơi thread để Java có thể xử lý tổ chức cho bạn và cho mỗi dòng phương pháp createMessageStreamsByFilter mang đến cho bạn tiêu thụ nó trong một Runnable. Ví dụ:
int NUMBER_OF_PARTITIONS = 6;
Properties consumerConfig = new Properties();
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181");
consumerConfig.put("backoff.increment.ms", "100");
consumerConfig.put("autooffset.reset", "largest");
consumerConfig.put("groupid", "java-consumer-example");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic");
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for(final KafkaStream<Message> stream: streams){
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<Message> msgAndMetadata: stream) {
ByteBuffer buffer = msgAndMetadata.message().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//Do something with the bytes you just got off Kafka.
}
}
});
}
Trong ví dụ này, tôi hỏi 6 chủ đề cho mỗi chủ đề và tôi liệt kê hai chủ đề trong danh sách trắng của mình. Một khi chúng ta có các chốt của các luồng đến, chúng ta có thể lặp qua nội dung của chúng, đó là các đối tượng MessageAndMetadata. Siêu dữ liệu thực sự chỉ là tên chủ đề và bù đắp. Khi bạn phát hiện ra bạn có thể làm điều đó trong một chuỗi nếu bạn yêu cầu 1 luồng thay vì, trong ví dụ 6 của tôi, nhưng nếu bạn yêu cầu xử lý song song, cách tốt nhất là khởi chạy trình thực thi với một luồng cho mỗi luồng được trả về.
Sử dụng SimpleConsumer không phải là một lựa chọn? –