2016-01-05 13 views
6

Tôi hoàn toàn mới đối với Kafka và avro và cố gắng sử dụng gói hợp lý. Chúng tôi có các POJO hiện có mà chúng tôi sử dụng cho JPA và tôi muốn có thể đơn giản tạo ra một cá thể POJO của tôi mà không phải phản ánh từng giá trị vào một bản ghi chung theo cách thủ công. Tôi dường như bị thiếu như thế nào điều này được thực hiện trong tài liệu hướng dẫn.Chuyển đổi pojos thành hồ sơ chung trong confluent.io để gửi qua KafkaProducer

Các ví dụ sử dụng một kỷ lục chung và thiết lập giá trị của từng cái một như vậy:

String key = "key1"; 
String userSchema = "{\"type\":\"record\"," + 
        "\"name\":\"myrecord\"," + 
        "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; 
Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(userSchema); 
GenericRecord avroRecord = new GenericData.Record(schema); 
avroRecord.put("f1", "value1"); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Có rất nhiều ví dụ để có được một sơ đồ từ một lớp học và tôi thấy các chú thích để thay đổi mà giản đồ khi cần thiết. Bây giờ làm thế nào để tôi lấy một thể hiện của một POJO và chỉ cần gửi nó đến serializer như là và có thư viện làm công việc của phù hợp lên lược đồ từ lớp và sau đó sao chép các giá trị vào một bản ghi chung? Tôi đang đi về điều này tất cả sai? Những gì tôi muốn kết thúc làm là một cái gì đó như thế này:

String key = "key1"; 
Schema schema = ReflectData.get().getSchema(myObject.getClass()); 
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Cảm ơn!

Trả lời

1

Tôi sẽ tạo trình tự sắp xếp của riêng mình trong trường hợp này:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer { 
    private final EncoderFactory encoderFactory = EncoderFactory.get(); 

    @Override 
    protected byte[] serializeImpl(String subject, Object object) throws SerializationException { 
     //TODO: consider caching schemas 
     Schema schema = null; 

     if(object == null) { 
     return null; 
     } else { 
     try { 
      schema = ReflectData.get().getSchema(object.getClass()); 
      int e = this.schemaRegistry.register(subject, schema); 
      ByteArrayOutputStream out = new ByteArrayOutputStream(); 
      out.write(0); 
      out.write(ByteBuffer.allocate(4).putInt(e).array()); 

      BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); 
      DatumWriter<Object> writer = new ReflectDatumWriter<>(schema); 
      writer.write(object, encoder); 
      encoder.flush(); 
      out.close(); 

      byte[] bytes = out.toByteArray(); 
      return bytes; 
     } catch (IOException ioe) { 
      throw new SerializationException("Error serializing Avro message", ioe); 
     } catch (RestClientException rce) { 
      throw new SerializationException("Error registering Avro schema: " + schema, rce); 
     } catch (RuntimeException re) { 
      throw new SerializationException("Error serializing Avro message", re); 
     } 
     } 
    } 
} 
Các vấn đề liên quan