2015-05-20 18 views
9

Tôi đang cố gắng sử dụng API Java người tiêu dùng cấp thấp để quản lý bù thủ công, với kafka_2.10-0.8.2.1 mới nhất. Để xác minh rằng các offset tôi cam kết/đọc từ Kafka là chính xác, tôi sử dụng công cụ kafka.tools.ConsumerOffsetChecker.Làm rõ hoạt động bù trừ Java API của Kafka

Dưới đây là một ví dụ về đầu ra cho chủ đề nhóm/tiêu dùng của tôi:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic 
Group           Topic                          Pid Offset          logSize         Lag             Owner 
elastic_search_group my_log_topic              0   5               29              24              none 

Dưới đây là giải thích của tôi về kết quả:

offset = 5 -> đây là hiện offset của người tiêu dùng 'elastic_search_group' của tôi

logSize = 29 -> đây là phần bù Mới nhất - khoảng trống của thông báo tiếp theo sẽ đến chủ đề/phân vùng này

Lag = 24 -> 29-5 - bao nhiêu thư chưa được xử lý bởi 'elastic_search_group' tiêu dùng của tôi

Pid - phân vùng ID

Q1: là thế này có đúng không?

Bây giờ, tôi muốn nhận được thông tin tương tự từ người tiêu dùng Java của tôi. Tại đây, tôi thấy rằng tôi phải sử dụng hai API khác nhau:

kafka.javaapi. OffsetRequest để nhận các lần bù sớm nhất và mới nhất, nhưng kafka.javaapi. OffsetFetchRequest để nhận bù đắp hiện tại.

Để có được sớm (hoặc mới nhất) bù đắp tôi làm:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)); 
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)); 
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
OffsetResponse response = simpleConsumer.getOffsetsBefore(request); 
long[] offsets = response.offsets(topic, partition); 
long myEarliestOffset = offsets[0]; 
// OR for Latest: long myLatestOffset = offsets[0]; 

Và để có được hiện tại bù đắp tôi phải sử dụng một API hoàn toàn khác nhau:

short versionID = 0; 
int correlationId = 0; 
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();  
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition); 
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId); 
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq); 
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset(); 

Q2: là nó có đúng không? tại sao có hai API khác nhau để có được một thông tin rất giống nhau?

Q3: vấn đề versionId và correlationId tôi đang sử dụng ở đây có quan trọng không? Tôi mặc dù versionId nên là 0 cho trước-0.8.2.1 kafka, và là 1 cho 0.8.2.1 và sau đó - nhưng có vẻ như nó hoạt động với 0 cho 0.8.2.1 là tốt - xem dưới đây?

Vì vậy, ví dụ các trạng thái của các chủ đề trên, và đầu ra trên các ConsumerOffsetChecker, đây là những gì tôi nhận được từ mã Java của tôi:

currentOffset = 5; earliestOffset = 29; latestOffset = 29

'currentOffset' có vẻ là Ok, 'latestOffset' cũng chính xác, nhưng 'initialOffset' sớm nhất? Tôi hy vọng nó sẽ có ít nhất là '5'?

Q4: Làm thế nào có thể xảy ra khi đặt sớm nhất cao hơn giá trị hiện tạiOffset? Nghi ngờ duy nhất của tôi là có thể tin nhắn từ chủ đề đã được xóa sạch do chính sách lưu giữ…. Bất kỳ trường hợp nào khác có thể xảy ra?

Trả lời

10

Tôi đã tìm kiếm phương tiện tìm kiếm độ trễ trong phân vùng. Và điều đó liên quan đến các bước tương tự bạn đã thực hiện. Cho đến nay, từ bất cứ điều gì tôi đã học được, tôi có thể cho bạn câu trả lời.

  1. logSize trực tiếp chỉ ra số lượng thư đã được tích lũy trong phân đoạn cụ thể đó. Hoặc, nó chỉ định độ lệch tối đa của các thư trong phân vùng đó. Bù đắp là phần bù của thông điệp được tiêu thụ thành công lần cuối. Vì vậy, độ trễ chỉ là sự khác biệt giữa kích thước nhật ký và bù đắp.
  2. Có đúng. Cho đến nay, đó là hai cách duy nhất để tìm bù đắp hiện tại và bù đắp sớm nhất hoặc mới nhất
  3. Tôi không biết tại sao cần phải chỉ định versionId. Bạn có thể sử dụng kafka.api.OffsetRequest.CurrentVersion() để nhận versionId. Vì vậy, hardcoding có thể tránh được. Bạn có thể giả định một cách an toàn correlationId là 0.
  4. Điều này thật lạ. Khi tôi sử dụng EarliestTime() tôi nhận được bù đắp sớm nhất là 0 ngay cả khi bù đắp hiện tại của tôi đã tiến triển nhiều hơn nữa. Nó có nghĩa là đó là sự khởi đầu của phân vùng. Vì vậy, khi một số tin nhắn hết hạn trong một thời gian tương lai, giá trị bù lại sớm nhất này sẽ là một số khác không. Bây giờ nếu các tin nhắn đã bị xóa vì độ trễ chính sách lưu giữ nên đã được thay đổi. Tôi không chắc chắn về hành vi này. Một cách để chắc chắn sẽ là, chạy người tiêu dùng sau khi lưu ý đọc và kiểm tra trong nhật ký của nó. Nó sẽ hiển thị các dòng như thế này.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo: 52 :: đặt lại mức tiêu thụ bù đắp của các yêu cầu: 2: lấy lại bù đắp = 405952: mức bù trừ được tiêu thụ = 335372 đến 335372 2015-06-09 18 : 49: 15 :: DEBUG :: PartitionTopicInfo: 52 :: reset tiêu thụ bù đắp các yêu cầu: 2: lấy bù đắp = 405.952: quả nhiều bù đắp = 335373 để 335373

Lưu ý rằng trong dòng log trên, lấy hài cốt bù đắp bù đắp tương tự và tiêu thụ đang tăng lên. Cuối cùng nó sẽ kết thúc trong

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo: 52 :: reset tiêu thụ bù đắp các yêu cầu: 2: lấy bù đắp = 405.952: quả nhiều bù đắp = 405.952-405.952

Sau đó, điều này có nghĩa là do bù trừ chính sách lưu giữ nhật ký từ 335372 đến 405952 đã hết hạn

+1

Cảm ơn, @ Shades88! Sau khi một số thử nghiệm, cho # 4 - Tôi đã đi đến kết luận tương tự, rằng tình trạng này sẽ xảy ra khi các bản ghi đã được dọn sạch do chính sách lưu giữ. Vì vậy, tôi đã thêm xử lý của trường hợp góc này vào logic người tiêu dùng của tôi - xác nhận rằng bù đắp hiện tại là> = bù đắp sớm nhất và đặt nó thành EarliestOffset nếu không. Cảm ơn! – Marina

+0

Về 'versionId', nếu bạn chỉ định' 0', các offset được lưu trữ trong Zookeeper và nếu bạn sử dụng '1', offset được lưu trữ trong một chủ đề Kafka đặc biệt. –

+0

Trang hữu ích http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –

Các vấn đề liên quan