2012-07-15 37 views
6

Tôi bắt đầu chơi với Kafka. Tôi đã thiết lập một cấu hình sở thú và tôi đã quản lý để gửi và tiêu thụ các tin nhắn String. Bây giờ tôi đang cố gắng để vượt qua một đối tượng (trong java), nhưng từ một số lý do, khi phân tích cú pháp tin nhắn trong người tiêu dùng tôi có vấn đề tiêu đề. Tôi đã thử một số tùy chọn serialization (bằng cách sử dụng Decoder/Encoder), và tất cả trở lại cùng một vấn đề tiêu đề.Nối tiếp Kafka của một đối tượng

Đây là mã của tôi Nhà sản xuất:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); 
     ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); 
     try { 
      producer.send(data); 
     } finally { 
      producer.close(); 
     } 

Và người tiêu dùng:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("zk.connectiontimeout.ms", "1000000"); 
     props.put("groupid", "test_group"); 

     // Create the connection to the cluster 
     ConsumerConfig consumerConfig = new ConsumerConfig(props); 
     ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 

     // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume 
     Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = 
       consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); 
     List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); 

     // create list of 4 threads to consume from each of the partitions 
     ExecutorService executor = Executors.newFixedThreadPool(4); 

     // consume the messages in the threads 
     for (final KafkaMessageStream<EventDetails> stream: streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        for(EventDetails event: stream) { 
         System.err.println("********** Got message" + event.toString());   
        } 
       } 
      }); 
     } 

và Serializer tôi:

public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { 
    public Message toMessage(EventDetails eventDetails) { 
     try { 
      ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
      byte[] serialized = mapper.writeValueAsBytes(eventDetails); 
      return new Message(serialized); 
} catch (IOException e) { 
      e.printStackTrace(); 
      return null; // TODO 
     } 
} 
    public EventDetails toEvent(Message message) { 
     EventDetails event = new EventDetails(); 

     ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
     try { 
      //TODO handle error 
      return mapper.readValue(message.payload().array(), EventDetails.class); 
     } catch (IOException e) { 
      e.printStackTrace(); 
      return null; 
     } 

    } 
} 

Và đây là lỗi tôi nhận được:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse 
at [Source: N/A; line: -1, column: -1] 

Khi tôi làm việc với MessagePack và bằng văn bản thuần túy cho một số ObjectOutputStream Tôi gặp vấn đề tiêu đề tương tự. Tôi cũng đã cố gắng thêm CRC32 tải trọng vào tin nhắn, nhưng điều đó cũng không giúp ích gì.

Tôi đang làm gì sai ở đây?

Trả lời

1

Bytebuffers .array() phương pháp không phải là rất đáng tin cậy. Nó phụ thuộc vào việc thực hiện cụ thể. Bạn có thể muốn thử

ByteBuffer bb = message.payload() 

byte[] b = new byte[bb.remaining()] 
bb.get(b, 0, b.length); 
return mapper.readValue(b, EventDetails.class) 
+0

Cảm ơn, điều này đã giải quyết được một vấn đề rất giống với tôi! – Jarmex

3

Hm, tôi đã không chạy vào các vấn đề tiêu đề tương tự mà bạn đang gặp phải nhưng dự án của tôi đã không được biên dịch một cách chính xác khi tôi đã không cung cấp một constructor VerifiableProperties trong bộ mã hóa của tôi/giải mã . Có vẻ kỳ lạ là người xây dựng bị mất tích sẽ làm hỏng việc deserialization của Jackson.

Có thể thử chia bộ mã hóa và bộ giải mã của bạn và bao gồm hàm tạo VerifiableProperties trong cả hai; bạn không cần phải triển khai Decoder[T] để tuần tự hóa. Tôi đã có thể triển khai thành công json de/serialization bằng cách sử dụng ObjectMapper theo định dạng trong this post.

Chúc may mắn!

Các vấn đề liên quan