2016-05-28 19 views
6

Tôi đang cố gắng kết nối với nhà môi giới của mình trên aws với auto.create.topics.enable = true trong tệp server.properties của tôi. Nhưng khi tôi đang cố gắng kết nối với nhà môi giới bằng cách sử dụng nhà sản xuất Java khách hàng, tôi nhận được error sau đây.Trường đọc lỗi 'topic_metadata' trong Kafka

1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - Lỗi không bắt buộc trong nhà sản xuất kafka I/O chủ đề: org.apache.kafka.common.protocol.types.SchemaException: Lỗi đọc trường 'topic_metadata': Lỗi đọc mảng có kích thước 619631, chỉ 37 byte có sẵn tại org.apache.kafka.common.protocol.types.Schema.read (Schema.java:73) tại org.apache.kafka.clients. NetworkClient.parseResponse (NetworkClient.java:380) tại org.apache.kafka.clients.NetworkClient.handleCompletedReceives (NetworkClient.java:449) tại org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java: 269) tại org.apache.kafka.clients.producer.internals.Sender.run (Sender.java:229) tại org.apache.kafka.clients.producer.internals.Sender.run (Sender.java:134) at java.lang.Thread.run (Unknown Source)

Sau đây là mã của nhà sản xuất khách hàng của tôi.

public static void main(String[] argv){ 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 0); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("block.on.buffer.full",true); 
     Producer<String, String> producer = new KafkaProducer<String, String>(props); 
     try{ for(int i = 0; i < 10; i++) 
     { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i))); 
      System.out.println("Tried sending:"+i);} 
     } 
     catch (Exception e){ 
      e.printStackTrace(); 
     } 
     producer.close(); 
} 

Ai đó có thể giúp tôi giải quyết vấn đề này không?

Trả lời

2

Tôi đã gặp phải sự cố tương tự. Vấn đề ở đây là, khi có sự không khớp giữa phiên bản máy khách kafka trong tệp pom và máy chủ kafka là khác nhau. Tôi đã sử dụng máy khách kafka 0.10.0.0_1 nhưng máy chủ kafka vẫn ở 0.9.0.0. Vì vậy, tôi nâng cấp phiên bản máy chủ kafka lên 10 vấn đề đã được giải quyết.

<dependency> 
      <groupId>org.apache.servicemix.bundles</groupId> 
      <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId> 
      <version>0.10.0.0_1</version> 
     </dependency>    
1

Có vẻ như tôi đã đặt thuộc tính sai ở phía máy khách cũng như tệp tin server.properties của tôi có các thuộc tính không có nghĩa cho ứng dụng khách mà tôi đã sử dụng. Tôi quyết định thay đổi ứng dụng khách java thành phiên bản 0.9.0 bằng cách sử dụng maven.

<dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.11</artifactId> 
<version>0.9.0.0</version> 
</dependency> 

tệp server.properties của tôi như sau.

broker.id=0 
port=9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
log.cleaner.enable=false 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=9000 
delete.topic.enable=true 
advertised.host.name=<aws public Ip> 
advertised.port=9092 

đang sản xuất của tôi trông giống như

import java.util.Properties; 
    import java.util.concurrent.ExecutionException; 

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerConfig; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.apache.kafka.common.serialization.StringSerializer; 
    public class HelloKafkaProducer 
    { 


     public static void main(String args[]) throws InterruptedException,  ExecutionException { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 

     KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); 

     boolean sync = false; 
     String topic="loader1"; 
     String key = "mykey"; 
     for(int i=0;i<1000;i++) 
     { 
     String value = "myvaluehasbeensent"+i+i; 
     ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value); 
     if (sync) { 
      producer.send(producerRecord).get(); 
     } else { 
      producer.send(producerRecord); 
     } 
     } 
     producer.close(); 
    } 
} 
-1

tôi giải quyết vấn đề này bằng cách chỉnh sửa

/etc/hosts file 

Kiểm tra file host của bạn rằng nếu Zookeeper hay ip môi giới của người khác không phải là trong tập tin này.

1

Đảm bảo bạn sử dụng đúng phiên bản. Cho phép nói rằng bạn sử dụng như sau maven dependecy:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
    <version>${flink.version}</version> 
</dependency> 

Vì vậy, các vật bằng: flink-connector-Kafka-0.8_2.10

Bây giờ kiểm tra xem bạn sử dụng phiên bản Kafka đúng:

cd /KAFKA_HOME/libs 

Bây giờ tìm kafka_YOUR-VERSION-sources.jar.

Trong trường hợp của tôi, tôi có kafka_2.10-0.8.2.1-sources.jar. Vì vậy, nó hoạt động tốt! :) Nếu bạn sử dụng các phiên bản khác nhau, chỉ cần thay đổi phụ thuộc của maven HOẶC tải xuống phiên bản kafka chính xác.

Các vấn đề liên quan