Tôi đang cố gắng sử dụng các SimpleConsumer trong Kafka 9 cho phép người dùng để phát lại các sự kiện từ một thời gian bù đắp - nhưng tin nhắn tôi nhận lại từ Kafka đang ở trong một mã hóa rất lạ:Kafka Java SimpleConsumer mã hóa lạ
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Sử dụng KafkaConsumer thư này phân tích cú pháp tốt. Đây là mã tôi đang sử dụng để lấy thư bằng SimpleConsumer:
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.debug("Found an old offset - skip");
continue;
}
readOffset = messageAndOffset.nextOffset();
int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
byte[] data = messageAndOffset.message().payload().array();
byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
log.debug("Read " + new String(realData, "UTF-8"));
}
tôi đã thêm đoạn code để bỏ qua là người đầu tiên x byte sau khi tôi giữ lấy UTF-32 lỗi về byte là quá cao, mà tôi cho rằng là vì Kafka chuẩn bị thông tin như kích thước tin nhắn cho tải trọng. Đây có phải là đồ tạo tác Avro không?
Không giống như Avro - ít nhất không phải mã hóa Avro nhị phân. Trong mã hóa nhị phân, bạn sẽ không nhận được thông tin lược đồ trong bản ghi. –
Mã của tôi hơi khác - Thay vì sử dụng 'payload(). Array()', tôi thực hiện nó giống như cách thực hiện ở đây: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Ví dụ Ví dụ: 'payload(). Get (bytes)' trong đó 'bytes' thuộc kiểu' byte [] '. Phương thức 'get()' sao chép dữ liệu, trong khi 'array()' trả về mảng thực, và trong Javadocs cho 'ByteBuffer' nó nói:" Sửa đổi nội dung của bộ đệm này sẽ làm cho nội dung của mảng được sửa đổi, và ngược lại." Có lẽ một cái gì đó như thế là những gì đang xảy ra? –
@Gandalf Bạn vui lòng mở thư của mình chỉ bằng notepad ++. Nếu bạn mở nó bằng wordpad hoặc notepad khác, thì nó sẽ trông nguy hiểm. Vì vậy, mở nó trong notepad ++ và cho chúng tôi biết. – SkyWalker