2016-07-18 15 views
8

Tôi đang viết một người dùng kafka sử dụng Java. Tôi muốn giữ thời gian thực của tin nhắn, vì vậy nếu có quá nhiều tin nhắn chờ đợi để tiêu thụ, chẳng hạn như 1000 hoặc nhiều hơn, tôi nên từ bỏ các tin nhắn unconsumed và bắt đầu tiêu thụ từ bù đắp mới nhất. Đối với vấn đề này, tôi cố gắng so sánh giá trị cam kết cuối cùng và chênh lệch mới nhất của một chủ đề (chỉ có 1 phân vùng), nếu chênh lệch giữa hai số dư lớn hơn một số tiền nhất định, tôi sẽ đặt giá trị bù mới nhất của chủ đề là bù đắp tiếp theo để tôi có thể từ bỏ những thông điệp dư thừa đó.Làm thế nào tôi có thể nhận được sự bù đắp MỚI nhất của một chủ đề kafka?

Bây giờ vấn đề của tôi là làm thế nào để có được sự bù đắp mới nhất của một chủ đề, một số người nói rằng tôi có thể sử dụng người tiêu dùng cũ, nhưng nó quá phức tạp, người tiêu dùng mới có chức năng này?

Trả lời

12

Người tiêu dùng mới cũng phức tạp.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

Điều này không có tác dụng nếu bạn chỉ muốn tính toán sự chênh lệch giữa chênh lệch hiện tại của khách hàng của bạn và bù trừ chủ đề kafka đã biết mới nhất! – hiaclibe

5

Đối với Kafka phiên bản: 0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

Trên đoạn trả về thông báo cam kết hiện tại bù đắp cho các chủ đề nhất định và phân vùng con số.

Các vấn đề liên quan