2016-05-24 14 views
5

Tôi đã xây dựng một môi trường thử nghiệm nhỏ trên máy tính Windows của mình và viết mã sau để kiểm tra kafka (sử dụng kafka_2.10: 0.9.0.1 từ org.apache.kafka).KafkaNhà sản xuất không gửi thành công thư vào hàng đợi

package iii.functiontesting; 

import java.text.ParseException; 
import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 


/** 
* Hello world! 
* 
*/ 
public class test4 
{ 
    public static void main(String[] args) throws ParseException 
    { 
     Properties producerProps=new Properties(); 
     producerProps.put("bootstrap.servers", "localhost:9092"); 
     producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName()); 
     producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName()); 
     producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName()); 
     producerProps.put("request.required.acks","1"); 
     KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps); 
     ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1"); 
     kafkawriter.send(msg); 
    } 
} 

tôi sử dụng lệnh sau để kiểm tra xem các thông điệp được một cách chính xác bằng văn bản vào hàng đợi

D:. \ Làm việc \ kafkaenv \ kafka_2.10-0.9.0.1 \ bin \ windows> \ Kafka-console-consumer.bat --zookeeper localhost: 2181 --topic Test3 --from-bắt đầu

Tuy nhiên, tôi phát hiện ra rằng Kafka-console-người tiêu dùng cho thấy không có gì.

Tôi nghi ngờ rằng máy chủ kafka của tôi không chạy đúng cách, vì vậy tôi sử dụng bàn điều khiển-nhà sản xuất để kiểm tra.

D: \ làm việc \ kafkaenv \ kafka_2.10-0.9.0.1 \ bin \ windows> \ Kafka-console-producer.bat --broker-list localhost:. 9092 --topic Test3

aaaaa

Lần này tôi có thể thấy hiển thị rõ ràng trong bảng điều khiển dành cho người tiêu dùng. Tôi không thể hiểu được điều gì xảy ra. Có ai giúp tôi không?

Trả lời

7

Bạn phải gọi phương thức KafkaProducer#flush [hoặc] KafkaProducer#close trước khi chấm dứt chương trình.

Trên thực tế, nhà sản xuất đệm các bản ghi trước khi gửi cho người môi giới. Xem buffer.memorybatch.size trong Kafka Producer configuration

kafkawriter.send(msg); 
kafkawriter.close(); 
+0

Cảm ơn bạn. Thư đã được gửi đúng sau khi thêm tuôn ra(). Tôi nghĩ rằng tôi cần phải nghiên cứu sâu hơn về cấu hình kafka. Cảm ơn câu trả lời của bạn. – greencomet

Các vấn đề liên quan