2014-09-08 14 views
5

Gần đây tôi đã nhận thấy rằng Camel hiện có thành phần riêng của mình cho Kafka nên tôi đã quyết định cho nó một vòng xoáy.Tích hợp Camel Kafka

Tôi quyết định thử một tập tin đơn giản thoải mái -> Kafka chủ đề như sau ...

<route> 
     <from uri="file:///tmp/input" /> 
     <setHeader headerName="kafka.PARTITION_KEY"> 
      <constant>Test</constant> 
     </setHeader> 
     <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> 
</route> 

này có vẻ đơn giản, tuy nhiên, trên chạy này tôi nhận được ...

java.lang.ClassCastException: java.lang.String cannot be cast to [B 
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) 
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

Và trên kiểm tra mã Camel, nó làm như sau ...

String msg = exchange.getIn().getBody(String.class); 
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); 
producer.send(data); 

Rõ ràng, đây là một vấn đề serialization, tôi chỉ không chắc chắn nếu có một workaround hoặc điều này vốn đã là một lỗi với việc thực hiện hiện tại? (Hoặc hy vọng chỉ là sự hiểu lầm của tôi)

Bất kỳ đề xuất nào? Cảm ơn, J

Trả lời

10

Ah, đừng bận tâm ở đây chúng tôi đi ... Hy vọng điều này sẽ giúp người khác, bạn phải đặt serialiser trong các tùy chọn.

<route> 
      <from uri="file:///tmp/input" /> 
      <setHeader headerName="kafka.PARTITION_KEY"> 
       <constant>Test</constant> 
      </setHeader> 
      <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> 
</route> 
0

Tìm thấy một ví dụ tốt đẹp cho việc cài đặt và khởi động Apache Kafka, và cấu hình một endpoint lạc đà để gửi thông điệp tới Kafka topic-

@Override 
    public void configure() throws Exception { 

     String topicName = "topic=javainuse-topic"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString(); 

     from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); 
    } 

Reference- Apache Camel + Kafka Integration example