2014-11-20 12 views
7

Tôi đang tạo ứng dụng cho phép đăng ký và xóa các chủ đề kafka một cách linh hoạt. Khi một thuê bao chủ đề được thêm vào, tôi muốn chạy một lệnh batch mỗi giờ để nhận tất cả các tin nhắn mới và đẩy chúng vào một kho dữ liệu khác.Kafka - Cách đơn giản nhất để nhận số dư mới nhất

Điều tôi muốn hiểu là làm thế nào để có được bù đắp hiện tại của một chủ đề. Ngay sau khi đăng ký được thêm vào, tôi muốn công việc hàng loạt tiếp theo nhận tất cả tin nhắn kể từ thời gian đăng ký gần đúng.

Ví dụ: hãy tưởng tượng tôi có chủ đề được gọi là "TopicA" liên tục nhận tin nhắn. Nếu tôi thêm đăng ký vào lúc 7g15 chiều, khi công việc hàng loạt chạy lúc 8 giờ tối, tôi muốn tất cả các tin nhắn từ 7g15 trở lên sẽ được nhóm lại. Tôi rất vui vì thời gian gần đúng - 7,10, 7,20 v.v. 5 hoặc 10 phút ở hai bên khiến tôi không lo lắng.

Vì vậy, giải pháp dự định của tôi là để có được bù đắp hiện tại của một chủ đề thời điểm đăng ký được thêm vào. Tôi đã nhìn vào người tiêu dùng đơn giản, nhưng tôi không muốn tham gia vào tất cả các khía cạnh quản lý cụm cho trường hợp sử dụng cơ bản này.

Tôi cũng đã xem xét người tiêu dùng cấp cao. Tôi có thể có nội dung như sau:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

Điều tôi lo ngại với cách tiếp cận này là cuộc gọi đến "đầu" thực sự là luồng. Vì vậy, tôi tin rằng nó sẽ chặn chờ tin nhắn tiếp theo. Chặn là có vấn đề vì nó có thể khiến các đăng ký khác được xếp hàng đợi cho đến khi thư tiếp theo đến.

Tôi rất vui khi dành một chút thời gian để thực hiện cách tiếp cận thứ hai, nhưng nếu có cách đơn giản hơn không yêu cầu tôi viết mã đồng thời có lỗi, thì tôi không phí thời gian.

Tôi cũng sẽ cần một cách để có được tất cả nhật ký kể từ khi bù đắp đó.

Trả lời

2

Mọi câu trả lời cho yêu cầu tìm nạp sẽ trả về "HighWaterMark" đại diện cho giá trị bù mới nhất trong nhật ký phân vùng hiện đang được sử dụng. Vì vậy, về mặt lý thuyết, bạn có thể tìm nạp thông điệp sớm nhất hoặc thực sự là bất kỳ thông báo nào (giả định một thông điệp tồn tại) cho một chủ đề nhất định và kéo HighWaterMark từ phản hồi. Có thêm chi tiết về HighWaterMark đây: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Tất nhiên, có thể kéo HighWaterMarkOffset từ phản ứng phụ thuộc vào khách hàng của bạn làm cho rằng dữ liệu có sẵn thông qua API Kafka riêng của mình.

+0

Đây sẽ là điểm đánh dấu nước cao cho một phân vùng cụ thể. Tôi nghĩ anh ấy hỏi về thông tin "partitionId, offsetId}" của tin nhắn mới nhất. – arviman

+1

Tôi nghĩ rằng không có điều gì như một "thông điệp mới nhất" toàn cầu. Kafka sẽ không quy mô nếu nó có một số cơ chế đồng bộ hóa toàn cầu ... –

Các vấn đề liên quan