2016-06-02 18 views
8

Tôi đang cố gắng sử dụng các SimpleConsumer trong Kafka 9 cho phép người dùng để phát lại các sự kiện từ một thời gian bù đắp - nhưng tin nhắn tôi nhận lại từ Kafka đang ở trong một mã hóa rất lạ:Kafka Java SimpleConsumer mã hóa lạ

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p= 
          ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username" 

Sử dụng KafkaConsumer thư này phân tích cú pháp tốt. Đây là mã tôi đang sử dụng để lấy thư bằng SimpleConsumer:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { 
     long currentOffset = messageAndOffset.offset(); 
     if (currentOffset < readOffset) { 
      log.debug("Found an old offset - skip"); 
      continue; 
     } 

     readOffset = messageAndOffset.nextOffset(); 

     int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id 
     byte[] data = messageAndOffset.message().payload().array(); 
     byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset); 
     log.debug("Read " + new String(realData, "UTF-8")); 
} 

tôi đã thêm đoạn code để bỏ qua là người đầu tiên x byte sau khi tôi giữ lấy UTF-32 lỗi về byte là quá cao, mà tôi cho rằng là vì Kafka chuẩn bị thông tin như kích thước tin nhắn cho tải trọng. Đây có phải là đồ tạo tác Avro không?

+0

Không giống như Avro - ít nhất không phải mã hóa Avro nhị phân. Trong mã hóa nhị phân, bạn sẽ không nhận được thông tin lược đồ trong bản ghi. –

+0

Mã của tôi hơi khác - Thay vì sử dụng 'payload(). Array()', tôi thực hiện nó giống như cách thực hiện ở đây: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Ví dụ Ví dụ: 'payload(). Get (bytes)' trong đó 'bytes' thuộc kiểu' byte [] '. Phương thức 'get()' sao chép dữ liệu, trong khi 'array()' trả về mảng thực, và trong Javadocs cho 'ByteBuffer' nó nói:" Sửa đổi nội dung của bộ đệm này sẽ làm cho nội dung của mảng được sửa đổi, và ngược lại." Có lẽ một cái gì đó như thế là những gì đang xảy ra? –

+0

@Gandalf Bạn vui lòng mở thư của mình chỉ bằng notepad ++. Nếu bạn mở nó bằng wordpad hoặc notepad khác, thì nó sẽ trông nguy hiểm. Vì vậy, mở nó trong notepad ++ và cho chúng tôi biết. – SkyWalker

Trả lời

0

tôi không bao giờ tìm thấy một câu trả lời tốt cho điều này - (... Mỗi phân vùng mặc dù việc thực hiện là nghèo) nhưng tôi chuyển sang sử dụng các SimpleConsumerđể truy vấn Kafka cho offsets tôi cần và sau đó sử dụng KafkaConsumer có nguồn gốc sử dụng seek(TopicPartition, offset) hoặc seekToBeginning(TopicPartition)để truy xuất thư. Hy vọng rằng họ sẽ thêm, cho khách hàng bản địa, khả năng truy xuất thư từ một dấu thời gian cụ thể trong bản phát hành tiếp theo.

0

Bạn đang tìm kiếm điều này?

readOffset = messageAndOffset.nextOffset(); 
ByteBuffer payload = messageAndOffset.message().payload(); 

    if(payload == null) { 
     System.err.println("Message is null : " + readOffset); 
     continue; 
    } 

final byte[] realData = new byte[payload.limit()]; 
payload.get(realData); 
System.out.println("Read " + new String(realData, "UTF-8")); 
0

Bạn có thể đăng nhập theo định kỳ phân vùng một bù đắp bạn đang cam kết với timestamp của thông điệp (có thể không từng cam kết) và sau đó bạn có thể có một số biện pháp trong tương lai để thiết lập offsets tiêu dùng của bạn. Tôi đoán đây là để gỡ lỗi sản xuất.

Tôi nghi ngờ họ sẽ thêm một tính năng như vậy, có vẻ như không thể xem xét cách thức hoạt động của Kafka, mặc dù tôi có thể bị nhầm lẫn, luôn có những điều thiên tài đang diễn ra. Tôi sẽ làm việc khai thác gỗ.