6

Tôi đang đối mặt với một số vấn đề nghiêm trọng khi cố gắng triển khai giải pháp cho nhu cầu của mình, liên quan đến KafkaConsumer (> = 0.9).Kafka Consumer - Hành vi thăm dò

Hãy tưởng tượng tôi có chức năng chỉ đọc n thư từ chủ đề kafka.

Ví dụ: getMsgs(5) ->nhận được 5 thư kafka tiếp theo trong chủ đề.

Vì vậy, tôi có một vòng lặp mà trông như thế này:

for (boolean exit= false;!exit;) 
{ 
    Records = consumer.poll(200); 
    for (Record r:records) { 
     processRecord(r); //do my things 
     numMss++; 
     if (numMss==maximum) //maximum=5 
      exit=true; 
    } 
} 

Đi vào tài khoản, vấn đề là phương pháp thăm dò() có thể nhận được nhiều hơn 5 tin nhắn. Ví dụ, nếu nó nhận được 10 tin nhắn, mã của tôi sẽ quên vĩnh viễn 5 tin nhắn kia, vì Kafka sẽ nghĩ rằng chúng đã được tiêu thụ.

tôi đã cố gắng commiting bù đắp nhưng dường như không làm việc:

consumer.commitSync(Collections.singletonMap(partition, 
    new OffsetAndMetadata(record.offset() + 1))); 

Ngay cả với cấu hình bù đắp, bất cứ khi nào tôi khởi động lại cho người tiêu dùng, nó sẽ không bắt đầu từ tin nhắn thứ 6 (hãy nhớ, tôi chỉ muốn 5 tin nhắn), nhưng từ 11th (kể từ khi cuộc thăm dò đầu tiên tiêu thụ 10 tin nhắn).

Có giải pháp nào cho điều này, hoặc có thể (chắc chắn nhất) tôi đang thiếu thứ gì đó?

Cảm ơn trước !!

Trả lời

3

Bạn có thể đặt max.poll.records để bất kỳ số nào bạn thích như vậy mà nhiều nhất bạn sẽ nhận được nhiều bản ghi trên mỗi cuộc thăm dò ý kiến.

Đối với trường hợp sử dụng của bạn mà bạn đã nêu trong vấn đề này, bạn không phải tự mình cam kết một cách rõ ràng. bạn chỉ có thể đặt enable.auto.commit thành true và đặt auto.offset.reset thành earliest sao cho nó sẽ khởi động khi không có người tiêu dùng group.id (các từ khác khi bạn bắt đầu đọc từ phân vùng lần đầu tiên). Một khi bạn có một nhóm.id và một số offsets của người tiêu dùng được lưu trữ trong Kafka và trong trường hợp quá trình tiêu dùng Kafka của bạn chết, nó sẽ tiếp tục từ lần bù trừ cuối cùng vì nó là hành vi mặc định bởi vì khi người tiêu dùng bắt đầu, đầu tiên sẽ tìm kiếm nếu có bất kỳ bù trừ cam kết nào và nếu có, sẽ tiếp tục từ lần bù trừ cam kết cuối cùng và auto.offset.resetsẽ không bắt đầu.

0

đặt thuộc tính auto.offset.reset là "mới nhất". Sau đó, hãy thử tiêu thụ, bạn sẽ nhận được các bản ghi tiêu thụ từ bù đắp cam kết.

Hoặc bạn sử dụng consumer.seek (TopicPartition, offset) api trước cuộc thăm dò ý kiến.

+0

auto.offset.reset nên sớm nhất và nó khởi động khi không có nhóm người tiêu dùng.id. mà không có id nhóm người ta không thể lưu trữ offsets. nếu đã có id nhóm người dùng auto.offset.reset sẽ không thực hiện bất kỳ điều gì và theo mặc định, người tiêu dùng chọn từ chênh lệch đã cam kết cuối cùng. – user1870400

0

Bạn đã tắt tự động cam kết bằng cách đặt enable.auto.commit thành false. Bạn cần phải vô hiệu hóa điều đó nếu bạn muốn tự cam kết bù đắp. Nếu không có cuộc gọi tới cuộc thăm dò ý kiến ​​() sẽ tự động cam kết bù đắp mới nhất của các tin nhắn mà bạn nhận được từ cuộc thăm dò trước đó().

0

Từ Kafka 0.9 tên tham số auto.offset.reset đã thay đổi;

Phải làm gì khi không có ban đầu bù đắp trong Kafka hoặc nếu hiện tại bù đắp không tồn tại nữa trên máy chủ (ví dụ, vì dữ liệu đã bị xóa):

earliest: automatically reset the offset to the earliest offset 

latest: automatically reset the offset to the latest offset 

none: throw exception to the consumer if no previous offset is found for the consumer's group 

anything else: throw exception to the consumer. 
Các vấn đề liên quan