Tôi đang cố gắng sử dụng API Java người tiêu dùng cấp thấp để quản lý bù thủ công, với kafka_2.10-0.8.2.1 mới nhất. Để xác minh rằng các offset tôi cam kết/đọc từ Kafka là chính xác, tôi sử dụng công cụ kafka.tools.ConsumerOffsetChecker.Làm rõ hoạt động bù trừ Java API của Kafka
Dưới đây là một ví dụ về đầu ra cho chủ đề nhóm/tiêu dùng của tôi:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Dưới đây là giải thích của tôi về kết quả:
offset = 5 -> đây là hiện offset của người tiêu dùng 'elastic_search_group' của tôi
logSize = 29 -> đây là phần bù Mới nhất - khoảng trống của thông báo tiếp theo sẽ đến chủ đề/phân vùng này
Lag = 24 -> 29-5 - bao nhiêu thư chưa được xử lý bởi 'elastic_search_group' tiêu dùng của tôi
Pid - phân vùng ID
Q1: là thế này có đúng không?
Bây giờ, tôi muốn nhận được thông tin tương tự từ người tiêu dùng Java của tôi. Tại đây, tôi thấy rằng tôi phải sử dụng hai API khác nhau:
kafka.javaapi. OffsetRequest để nhận các lần bù sớm nhất và mới nhất, nhưng kafka.javaapi. OffsetFetchRequest để nhận bù đắp hiện tại.
Để có được sớm (hoặc mới nhất) bù đắp tôi làm:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
Và để có được hiện tại bù đắp tôi phải sử dụng một API hoàn toàn khác nhau:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: là nó có đúng không? tại sao có hai API khác nhau để có được một thông tin rất giống nhau?
Q3: vấn đề versionId và correlationId tôi đang sử dụng ở đây có quan trọng không? Tôi mặc dù versionId nên là 0 cho trước-0.8.2.1 kafka, và là 1 cho 0.8.2.1 và sau đó - nhưng có vẻ như nó hoạt động với 0 cho 0.8.2.1 là tốt - xem dưới đây?
Vì vậy, ví dụ các trạng thái của các chủ đề trên, và đầu ra trên các ConsumerOffsetChecker, đây là những gì tôi nhận được từ mã Java của tôi:
currentOffset = 5; earliestOffset = 29; latestOffset = 29
'currentOffset' có vẻ là Ok, 'latestOffset' cũng chính xác, nhưng 'initialOffset' sớm nhất? Tôi hy vọng nó sẽ có ít nhất là '5'?
Q4: Làm thế nào có thể xảy ra khi đặt sớm nhất cao hơn giá trị hiện tạiOffset? Nghi ngờ duy nhất của tôi là có thể tin nhắn từ chủ đề đã được xóa sạch do chính sách lưu giữ…. Bất kỳ trường hợp nào khác có thể xảy ra?
Cảm ơn, @ Shades88! Sau khi một số thử nghiệm, cho # 4 - Tôi đã đi đến kết luận tương tự, rằng tình trạng này sẽ xảy ra khi các bản ghi đã được dọn sạch do chính sách lưu giữ. Vì vậy, tôi đã thêm xử lý của trường hợp góc này vào logic người tiêu dùng của tôi - xác nhận rằng bù đắp hiện tại là> = bù đắp sớm nhất và đặt nó thành EarliestOffset nếu không. Cảm ơn! – Marina
Về 'versionId', nếu bạn chỉ định' 0', các offset được lưu trữ trong Zookeeper và nếu bạn sử dụng '1', offset được lưu trữ trong một chủ đề Kafka đặc biệt. –
Trang hữu ích http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –