Tôi đang sử dụng apache kafka để nhắn tin. Tôi đã triển khai nhà sản xuất và người tiêu dùng trong Java. Làm thế nào chúng ta có thể nhận được số lượng tin nhắn trong một chủ đề?Java, Cách nhận số lượng tin nhắn trong một chủ đề trong apache kafka
Trả lời
Cách duy nhất để suy nghĩ về điều này từ quan điểm của người tiêu dùng là thực sự tiêu thụ các tin nhắn và đếm chúng sau đó.
Nhà môi giới Kafka cho thấy các bộ đếm JMX đối với số lượng tin nhắn nhận được kể từ khi khởi động nhưng bạn không thể biết có bao nhiêu thông báo đã bị xóa.
Trong hầu hết các trường hợp phổ biến, thư trong Kafka được xem là luồng vô hạn và nhận được giá trị rời rạc về số lượng hiện đang lưu trên đĩa không liên quan. Hơn nữa, mọi thứ trở nên phức tạp hơn khi giao dịch với một nhóm các nhà môi giới, tất cả đều có một tập hợp con các thông điệp trong một chủ đề.
Nó không phải là java, nhưng có thể hữu ích
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
| awk -F ":" '{sum += $3} END {print sum}'
Sử dụng https://prestodb.io/docs/current/connector/kafka-tutorial.html
Một động cơ SQL siêu, được cung cấp bởi Facebook, kết nối trên nhiều nguồn dữ liệu (Cassandra, Kafka, JMX, Redis. ..). PrestoDB đang chạy như một máy chủ với công nhân tùy chọn (có một chế độ độc lập mà không cần thêm công nhân), sau đó bạn sử dụng một JAR thực thi nhỏ (được gọi là presto CLI) để thực hiện truy vấn.
Một khi bạn đã cấu hình tốt máy chủ Presto, bạn có thể sử dụng SQL traditionnal:
SELECT count(*) FROM TOPIC_NAME;
công cụ này là tốt đẹp, nhưng nếu nó sẽ không hoạt động nếu chủ đề của bạn có nhiều hơn 2 chấm. – armandfp
@armandfp thông tin tốt đẹp –
Tôi thực sự sử dụng này cho benchmarking POC tôi. Mục bạn muốn sử dụng ConsumerOffsetChecker. Bạn có thể chạy nó bằng cách sử dụng kịch bản bash như dưới đây.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
Và dưới đây là kết quả: Như bạn có thể nhìn thấy trên hộp màu đỏ, 999 là số của thông điệp hiện trong chủ đề này.
Cập nhật: ConsumerOffsetChecker không được chấp nhận kể từ 0.10.0, bạn có thể muốn bắt đầu sử dụng ConsumerGroupCommand.
Xin lưu ý rằng ConsumerOffsetChecker không còn được dùng nữa và sẽ bị loại bỏ trong các bản phát hành sau 0.9.0. Sử dụng ConsumerGroupCommand để thay thế. (kafka.tools.ConsumerOffsetChecker $) –
Vâng, đó là những gì tôi đã nói. – Rudy
Câu cuối cùng của bạn không chính xác. Lệnh trên vẫn hoạt động trong 0.10.0.1 và cảnh báo giống như bình luận trước của tôi. –
Apache Kafka lệnh để có được un xử lý thông điệp trên tất cả các phân vùng của một chủ đề:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
Prints:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
Cột 6 là những thông điệp un-xử lý. Thêm chúng lên như thế này:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += $6}
END {print sum}'
awk đọc hàng, bỏ qua dòng tiêu đề và thêm cột thứ 6 vào cuối bản in tổng.
Prints
5
Để có được tất cả các tin nhắn được lưu trữ cho chủ đề, bạn có thể tìm kiếm khách hàng để đầu và kết thúc của dòng cho mỗi phân vùng và tổng hợp các kết quả
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
btw, nếu bạn đã bật tính năng nén thì có thể có khoảng trống trong luồng để số lượng thư thực tế có thể thấp hơn tổng số được tính ở đây. Để có được tổng số chính xác, bạn sẽ phải phát lại các tin nhắn và đếm chúng. – AutomatedMike
Sử dụng Ứng dụng khách Java của Kafka 2.11-1.0.0, bạn có thể thực hiện như sau:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
Đầu ra là s omething như thế này:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
- 1. Tải chủ đề từ tin nhắn kafka
- 2. tải chủ đề từ tin nhắn kafka trong tia lửa
- 3. Làm thế nào để tạo chủ đề trong apache kafka?
- 4. Apache Kafka - KafkaStream về chủ đề/phân vùng
- 5. Xóa tin nhắn sau khi tiêu thụ nó trong KAFKA
- 6. Nhận số lượng tin nhắn chưa đọc
- 7. Spring Integration Kafka Consumer Listener không nhận được tin nhắn
- 8. Giới hạn tên chủ đề của Apache Kafka là gì?
- 9. Nhận số lượng chủ đề RTS trong chương trình Haskell?
- 10. GCM - Hiếm khi nhận được tin nhắn chủ đề
- 11. Làm thế nào để bạn nhận được số lượng tin nhắn cho đăng ký chủ đề Azure?
- 12. ActiveMQ nhận được số lượng người tiêu dùng nghe một chủ đề từ java
- 13. Truy vấn hệ thống nhắn tin để nhận tin nhắn cuối cùng, số lượng tin nhắn chưa đọc và mảng người dùng trong cuộc hội thoại
- 14. cách tốt nhất để gửi tin nhắn cho chủ đề
- 15. Cách nhận tất cả tin nhắn bằng ThreadID trong android
- 16. Cách tạo Kafka ZKStringSerializer trong Java?
- 17. Kafka - Docker - Lỗi khi gửi tin nhắn từ Máy chủ đến Vùng chứa (Lô hết hạn)
- 18. Hai loại tin nhắn trong một hàng đợi tin nhắn
- 19. nhắn Strange về chủ đề trong C#
- 20. Thay đổi số lượng chủ đề cho máy chủ Jenkins
- 21. Nhận tin nhắn Adium trong Applescript
- 22. sự khác biệt giữa phân vùng và bản sao của một chủ đề trong cụm kafka
- 23. Truy xuất số lượng tin nhắn JMS thử lại
- 24. Đăng tin nhắn chúc mừng từ Chủ đề
- 25. Ansible: Nhận số lượng máy chủ trong nhóm
- 26. Nhận tin nhắn cuối cùng từ Kafka tiêu dùng giao diện điều khiển kịch bản
- 27. Làm thế nào tôi có thể nhận được sự bù đắp MỚI nhất của một chủ đề kafka?
- 28. Đảm bảo gửi nhiều tin nhắn đến Kafka cluster
- 29. Có thể lấy được khoảng trống tin nhắn cụ thể trong Kafka + SparkStreaming không?
- 30. Cách gửi một tin nhắn bằng Nhắn tin Firebase
Đây có phải là sự chênh lệch sớm nhất và mới nhất cho mỗi tổng phân vùng không? 'bash-4.3 # $ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell - danh sách người môi giới 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum + = $ 3} END {print sum}' bash-4.3 # $ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35 .25.95: 32774 - chủ đề kiểm tra chủ đề - thời gian -2 | awk -F ":" '{sum + = $ 3} END {print sum}' 12434609' Và sau đó sự khác biệt trả về các thư đang chờ xử lý thực tế trong chủ đề? Tôi có đúng không? – kisna
Vâng, đó là sự thật. Bạn phải tính toán một sự khác biệt nếu số bù sớm nhất không bằng 0. – ssemichev
Đó là những gì tôi nghĩ :). – kisna