2016-12-14 12 views
6

Tôi có Java Object tùy chỉnh của tôi và muốn tận dụng JVM trong serialization xây dựng để gửi nó đến một chủ đề Kafka, nhưng serialization thất bại với lỗi dưới đâyGửi Tuỳ chỉnh Java Objects để Kafka Topic

org.apache.kafka. common.errors.SerializationException: không thể chuyển đổi giá trị của lớp com.spring.kafka.Payload đến lớp org.apache.kafka.common.serialization.ByteArraySerializer quy định tại value.serializer

Payload. java

public class Payload implements Serializable { 

    private static final long serialVersionUID = 123L; 

    private String name="vinod"; 

    private int anInt = 5; 

    private Double aDouble = new Double("5.0"); 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public int getAnInt() { 
     return anInt; 
    } 

    public void setAnInt(int anInt) { 
     this.anInt = anInt; 
    } 

    public Double getaDouble() { 
     return aDouble; 
    } 

    public void setaDouble(Double aDouble) { 
     this.aDouble = aDouble; 
    } 

} 

Trong quá trình tạo của tôi về nhà sản xuất, tôi có các thuộc tính sau thiết

<entry key="key.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 
       <entry key="value.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 

gửi My invoke như sau

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload())); 

cách chính xác để gửi cho một đối tượng tùy chỉnh java là gì thông qua một nhà sản xuất cho một chủ đề kafka?

+0

Tùy chọn khác là để chuyển đổi sang định dạng JSON và gửi – ravthiru

Trả lời

8

Chúng tôi có 2 tùy chọn được liệt kê dưới đây

1) Nếu chúng ta có ý định gửi đối tượng tùy chỉnh java để sản xuất, Chúng tôi cần phải tạo ra một serializer mà thực hiện org.apache.kafka.common.serialization.Serializer và vượt qua mà lớp serializer trong quá trình tạo của nhà sản xuất của bạn

chiếu mã dưới đây

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { 

    public void configure(Map map, boolean b) { 

    } 

    public byte[] serialize(String s, Object o) { 

     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    public void close() { 

    } 
} 

Và thiết lập các giá trị phù hợp serializer

<entry key="value.serializer" 
         value="com.spring.kafka.PayloadSerializer" /> 

2) Không cần phải tạo lớp serializer tùy chỉnh. Sử dụng các ByteArraySerializer hiện có, nhưng trong quá trình gửi theo quá trình

Java Object -> String (Ưu tiên sử JSON represenation thay vì toString) -> ByteArray

3

Vì bạn đang sử dụng ByteArraySerializer, bạn cần khởi tạo nhà sản xuất byte [].

Producer<byte[],byte[]> producer = new KafkaProducer<>(props); 

và sau đó trong khi sản xuất vượt qua byte [] sau khi serializing hoặc một số phương pháp khác, ví dụ,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes())); 

Nếu bạn đang đi qua chỉ là một Object Payload cho nhà sản xuất sau đó nó sẽ được tốt hơn để có serializer chính và serializer giá trị như bất cứ điều gì bạn có ý định vượt qua và trong khi đọc bạn cần phải đọc từ dữ liệu đó.

Thực tiễn tốt là sử dụng Serializable và ByteArraySerializer/ByteArrayDeserializer.