Tôi bắt đầu chơi với Kafka. Tôi đã thiết lập một cấu hình sở thú và tôi đã quản lý để gửi và tiêu thụ các tin nhắn String. Bây giờ tôi đang cố gắng để vượt qua một đối tượng (trong java), nhưng từ một số lý do, khi phân tích cú pháp tin nhắn trong người tiêu dùng tôi có vấn đề tiêu đề. Tôi đã thử một số tùy chọn serialization (bằng cách sử dụng Decoder/Encoder), và tất cả trở lại cùng một vấn đề tiêu đề.Nối tiếp Kafka của một đối tượng
Đây là mã của tôi Nhà sản xuất:
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer");
ProducerConfig config = new ProducerConfig(props);
Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config);
ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails());
try {
producer.send(data);
} finally {
producer.close();
}
Và người tiêu dùng:
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer());
List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for (final KafkaMessageStream<EventDetails> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(EventDetails event: stream) {
System.err.println("********** Got message" + event.toString());
}
}
});
}
và Serializer tôi:
public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> {
public Message toMessage(EventDetails eventDetails) {
try {
ObjectMapper mapper = new ObjectMapper(new SmileFactory());
byte[] serialized = mapper.writeValueAsBytes(eventDetails);
return new Message(serialized);
} catch (IOException e) {
e.printStackTrace();
return null; // TODO
}
}
public EventDetails toEvent(Message message) {
EventDetails event = new EventDetails();
ObjectMapper mapper = new ObjectMapper(new SmileFactory());
try {
//TODO handle error
return mapper.readValue(message.payload().array(), EventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
Và đây là lỗi tôi nhận được:
org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse
at [Source: N/A; line: -1, column: -1]
Khi tôi làm việc với MessagePack
và bằng văn bản thuần túy cho một số ObjectOutputStream
Tôi gặp vấn đề tiêu đề tương tự. Tôi cũng đã cố gắng thêm CRC32 tải trọng vào tin nhắn, nhưng điều đó cũng không giúp ích gì.
Tôi đang làm gì sai ở đây?
Cảm ơn, điều này đã giải quyết được một vấn đề rất giống với tôi! – Jarmex