Tôi đang đối mặt với một số vấn đề nghiêm trọng khi cố gắng triển khai giải pháp cho nhu cầu của mình, liên quan đến KafkaConsumer (> = 0.9).Kafka Consumer - Hành vi thăm dò
Hãy tưởng tượng tôi có chức năng chỉ đọc n thư từ chủ đề kafka.
Ví dụ: getMsgs(5)
->nhận được 5 thư kafka tiếp theo trong chủ đề.
Vì vậy, tôi có một vòng lặp mà trông như thế này:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(200);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
Đi vào tài khoản, vấn đề là phương pháp thăm dò() có thể nhận được nhiều hơn 5 tin nhắn. Ví dụ, nếu nó nhận được 10 tin nhắn, mã của tôi sẽ quên vĩnh viễn 5 tin nhắn kia, vì Kafka sẽ nghĩ rằng chúng đã được tiêu thụ.
tôi đã cố gắng commiting bù đắp nhưng dường như không làm việc:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Ngay cả với cấu hình bù đắp, bất cứ khi nào tôi khởi động lại cho người tiêu dùng, nó sẽ không bắt đầu từ tin nhắn thứ 6 (hãy nhớ, tôi chỉ muốn 5 tin nhắn), nhưng từ 11th (kể từ khi cuộc thăm dò đầu tiên tiêu thụ 10 tin nhắn).
Có giải pháp nào cho điều này, hoặc có thể (chắc chắn nhất) tôi đang thiếu thứ gì đó?
Cảm ơn trước !!
auto.offset.reset nên sớm nhất và nó khởi động khi không có nhóm người tiêu dùng.id. mà không có id nhóm người ta không thể lưu trữ offsets. nếu đã có id nhóm người dùng auto.offset.reset sẽ không thực hiện bất kỳ điều gì và theo mặc định, người tiêu dùng chọn từ chênh lệch đã cam kết cuối cùng. – user1870400