2015-02-18 18 views

Trả lời

14

Cách duy nhất để suy nghĩ về điều này từ quan điểm của người tiêu dùng là thực sự tiêu thụ các tin nhắn và đếm chúng sau đó.

Nhà môi giới Kafka cho thấy các bộ đếm JMX đối với số lượng tin nhắn nhận được kể từ khi khởi động nhưng bạn không thể biết có bao nhiêu thông báo đã bị xóa.

Trong hầu hết các trường hợp phổ biến, thư trong Kafka được xem là luồng vô hạn và nhận được giá trị rời rạc về số lượng hiện đang lưu trên đĩa không liên quan. Hơn nữa, mọi thứ trở nên phức tạp hơn khi giao dịch với một nhóm các nhà môi giới, tất cả đều có một tập hợp con các thông điệp trong một chủ đề.

1

Tôi chưa thử tự mình this nhưng có vẻ hợp lý.

Bạn cũng có thể sử dụng kafka.tools.ConsumerOffsetChecker (source).

43

Nó không phải là java, nhưng có thể hữu ích

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
    --broker-list <broker>: <port> 
    --topic <topic-name> --time -1 --offsets 1 
    | awk -F ":" '{sum += $3} END {print sum}' 
+4

Đây có phải là sự chênh lệch sớm nhất và mới nhất cho mỗi tổng phân vùng không? 'bash-4.3 # $ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell - danh sách người môi giới 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum + = $ 3} END {print sum}' bash-4.3 # $ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35 .25.95: 32774 - chủ đề kiểm tra chủ đề - thời gian -2 | awk -F ":" '{sum + = $ 3} END {print sum}' 12434609' Và sau đó sự khác biệt trả về các thư đang chờ xử lý thực tế trong chủ đề? Tôi có đúng không? – kisna

+0

Vâng, đó là sự thật. Bạn phải tính toán một sự khác biệt nếu số bù sớm nhất không bằng 0. – ssemichev

+0

Đó là những gì tôi nghĩ :). – kisna

8

Sử dụng https://prestodb.io/docs/current/connector/kafka-tutorial.html

Một động cơ SQL siêu, được cung cấp bởi Facebook, kết nối trên nhiều nguồn dữ liệu (Cassandra, Kafka, JMX, Redis. ..). PrestoDB đang chạy như một máy chủ với công nhân tùy chọn (có một chế độ độc lập mà không cần thêm công nhân), sau đó bạn sử dụng một JAR thực thi nhỏ (được gọi là presto CLI) để thực hiện truy vấn.

Một khi bạn đã cấu hình tốt máy chủ Presto, bạn có thể sử dụng SQL traditionnal:

SELECT count(*) FROM TOPIC_NAME; 
+0

công cụ này là tốt đẹp, nhưng nếu nó sẽ không hoạt động nếu chủ đề của bạn có nhiều hơn 2 chấm. – armandfp

+0

@armandfp thông tin tốt đẹp –

14

Tôi thực sự sử dụng này cho benchmarking POC tôi. Mục bạn muốn sử dụng ConsumerOffsetChecker. Bạn có thể chạy nó bằng cách sử dụng kịch bản bash như dưới đây.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup 

Và dưới đây là kết quả: enter image description here Như bạn có thể nhìn thấy trên hộp màu đỏ, 999 là số của thông điệp hiện trong chủ đề này.

Cập nhật: ConsumerOffsetChecker không được chấp nhận kể từ 0.10.0, bạn có thể muốn bắt đầu sử dụng ConsumerGroupCommand.

+0

Xin lưu ý rằng ConsumerOffsetChecker không còn được dùng nữa và sẽ bị loại bỏ trong các bản phát hành sau 0.9.0. Sử dụng ConsumerGroupCommand để thay thế. (kafka.tools.ConsumerOffsetChecker $) –

+0

Vâng, đó là những gì tôi đã nói. – Rudy

+0

Câu cuối cùng của bạn không chính xác. Lệnh trên vẫn hoạt động trong 0.10.0.1 và cảnh báo giống như bình luận trước của tôi. –

2

Apache Kafka lệnh để có được un xử lý thông điệp trên tất cả các phân vùng của một chủ đề:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 

Prints:

Group  Topic  Pid Offset   logSize   Lag    Owner 
test_group test   0 11051   11053   2    none 
test_group test   1 10810   10812   2    none 
test_group test   2 11027   11028   1    none 

Cột 6 là những thông điệp un-xử lý. Thêm chúng lên như thế này:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}' 

awk đọc hàng, bỏ qua dòng tiêu đề và thêm cột thứ 6 vào cuối bản in tổng.

Prints

5 
3

Để có được tất cả các tin nhắn được lưu trữ cho chủ đề, bạn có thể tìm kiếm khách hàng để đầu và kết thúc của dòng cho mỗi phân vùng và tổng hợp các kết quả

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() 
     .map(p -> new TopicPartition(topic, p.partition())) 
     .collect(Collectors.toList()); 
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet()); 
Map<TopicPartition, Long> endPartitions = partitions.stream() 
     .collect(Collectors.toMap(Function.identity(), consumer::position)); 
    consumer.seekToBeginning(Collections.emptySet()); 
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum()); 
+0

btw, nếu bạn đã bật tính năng nén thì có thể có khoảng trống trong luồng để số lượng thư thực tế có thể thấp hơn tổng số được tính ở đây. Để có được tổng số chính xác, bạn sẽ phải phát lại các tin nhắn và đếm chúng. – AutomatedMike

0

Sử dụng Ứng dụng khách Java của Kafka 2.11-1.0.0, bạn có thể thực hiện như sau:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Collections.singletonList("test")); 
    while(true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 

      // after each message, query the number of messages of the topic 
      Set<TopicPartition> partitions = consumer.assignment(); 
      Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); 
      for(TopicPartition partition : offsets.keySet()) { 
       System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); 
      } 
     } 
    } 

Đầu ra là s omething như thế này:

offset = 10, key = null, value = un 
partition test is at 13 
offset = 11, key = null, value = deux 
partition test is at 13 
offset = 12, key = null, value = trois 
partition test is at 13 
1

Trong hầu hết các phiên bản gần đây của Kafka Manager, có một cột với tiêu đề tóm tắt Hiệu số gần đây.

enter image description here

Các vấn đề liên quan