2015-03-30 21 views
5

Tôi đang cố gắng làm việc với API kafka trong java. Tôi đang sử dụng phụ thuộc maven sau:Làm cách nào để tạo thông báo bằng API Kafka 8.2 trong Java?

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

Tôi gặp sự cố khi kết nối với máy chủ kafka từ xa. Tôi đã thay đổi thuộc tính cổng tệp 'server.properties' của kafka thành cổng 8080. Tôi có thể khởi động cả trình quản lý vườn thú và máy chủ kafka. Tôi cũng có thể sử dụng nhà sản xuất bảng điều khiển và ứng dụng người tiêu dùng đi kèm với tải xuống kafka. (Scala 2.10 phiên bản)

Tôi đang sử dụng mã khách hàng sau đây để tạo ra một KafkaProducer xa

Properties propsProducer = new Properties(); 

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("topic.metadata.refresh.interval.ms", "0"); 

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

Khi tôi đã tạo ra nhà sản xuất, tôi có thể chạy dòng sau và nhận hợp lệ thông tin chủ đề trở lại, cấp strTopic là một tên chủ đề hiện có.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

Khi tôi cố gắng để gửi một tin nhắn, tôi thực hiện như sau:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); 

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

Cuộc gọi để gửi() khối vô hạn định và khi tôi tự chấm dứt quá trình này, tôi thấy rằng các ổ cắm LỖI Đóng vì lỗi trên máy chủ kafka (IOException, kết nối Reset bởi Peer) lỗi.

Ngoài ra, không có gì đáng kể là các thuộc tính host.name, advertised.host.name và advertised.port đều được nhận xét trên tệp 'server.properties'. Oh, và nếu tôi thay đổi dòng:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

để

propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

và chạy nó trên máy chủ giống như nơi máy chủ Kafka đã được cài đặt, nó hoạt động nhưng tôi đang cố gắng để làm việc với nó từ xa.

Đánh giá cao sự trợ giúp và nếu tôi có thể làm rõ, hãy cho tôi biết.

+1

Bạn có đang sử dụng '172.xx.xx.xxx' làm địa chỉ IP lưu trữ không? –

+0

Không, đó là một IP đầy đủ của x chỉ là mặt nạ. –

+0

Kk. Có lẽ vấn đề tường lửa? Bạn có thể xác thực kết nối mạng trên cổng 8080 bằng netcat không? –

Trả lời

3

Sau nhiều lần đào, tôi quyết định triển khai ví dụ được tìm thấy tại đây: Kafka Producer Example. Tôi rút ngắn mã và không triển khai lớp phân vùng. Tôi cập nhật pom của tôi với sự phụ thuộc được liệt kê và tôi vẫn còn có cùng một vấn đề. Cuối cùng, tôi đã thực hiện một số thay đổi cấu hình và mọi thứ hoạt động.

Phần cuối cùng của câu đố là xác định máy chủ Kafka trong/etc/hosts của cả máy chủ và máy khách. Tôi đã thêm phần sau vào cả hai tệp.

172.xx.xx.xxx  serverHost1 

Một lần nữa, dấu x chỉ là mặt nạ. Sau đó, tôi đặt advertised.host.name trong tệp server.properties thành serverHost1. LƯU Ý: Tôi nhận được IP đó sau khi chạy một ifconfig trên máy chủ.

Tôi đã thay đổi dòng

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

để

propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

Các Kafka API không thích thực tế là tôi đã xác định một địa chỉ IP như là một chuỗi.Thay vào đó, nó đang tìm kiếm IP từ trong tệp etc/hosts mặc dù tài liệu hướng dẫn cho biết:

"Tên máy chủ nhà môi giới sẽ quảng cáo cho người sản xuất và người tiêu dùng. Nếu không được đặt, nó sẽ sử dụng giá trị cho" host.name "nếu được định cấu hình Nếu không, nó sẽ sử dụng giá trị trả về từ java.net.InetAddress.getCanonicalHostName(). "

Chỉ trả về IP, ở dạng chuỗi, trước đây tôi đã sử dụng nếu không được xác định trong etc/hosts của máy khách, nếu không nó sẽ trả về tên được ghép nối với IP (serverHost1 trong trường hợp của tôi). Ngoài ra, tôi cũng không bao giờ đặt giá trị của host.name.

+0

Bootstrap.servers có thay thế cho metadata.broker.list không? –

+1

Có, tôi tin như vậy. Trong phiên bản 0.8.2.0, trường này là "metadata.broker.list" nhưng trong các bản phát hành mới hơn, nó là "boostrap.servers" –

+0

Có !! Đó là tru. với ProducerAPI mới, đây là cấu hình mới. –

Các vấn đề liên quan