Tôi đang cố gắng làm việc với API kafka trong java. Tôi đang sử dụng phụ thuộc maven sau:Làm cách nào để tạo thông báo bằng API Kafka 8.2 trong Java?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
Tôi gặp sự cố khi kết nối với máy chủ kafka từ xa. Tôi đã thay đổi thuộc tính cổng tệp 'server.properties' của kafka thành cổng 8080. Tôi có thể khởi động cả trình quản lý vườn thú và máy chủ kafka. Tôi cũng có thể sử dụng nhà sản xuất bảng điều khiển và ứng dụng người tiêu dùng đi kèm với tải xuống kafka. (Scala 2.10 phiên bản)
Tôi đang sử dụng mã khách hàng sau đây để tạo ra một KafkaProducer xa
Properties propsProducer = new Properties();
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");
KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);
Khi tôi đã tạo ra nhà sản xuất, tôi có thể chạy dòng sau và nhận hợp lệ thông tin chủ đề trở lại, cấp strTopic là một tên chủ đề hiện có.
List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);
Khi tôi cố gắng để gửi một tin nhắn, tôi thực hiện như sau:
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());
RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
Cuộc gọi để gửi() khối vô hạn định và khi tôi tự chấm dứt quá trình này, tôi thấy rằng các ổ cắm LỖI Đóng vì lỗi trên máy chủ kafka (IOException, kết nối Reset bởi Peer) lỗi.
Ngoài ra, không có gì đáng kể là các thuộc tính host.name, advertised.host.name và advertised.port đều được nhận xét trên tệp 'server.properties'. Oh, và nếu tôi thay đổi dòng:
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
để
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");
và chạy nó trên máy chủ giống như nơi máy chủ Kafka đã được cài đặt, nó hoạt động nhưng tôi đang cố gắng để làm việc với nó từ xa.
Đánh giá cao sự trợ giúp và nếu tôi có thể làm rõ, hãy cho tôi biết.
Bạn có đang sử dụng '172.xx.xx.xxx' làm địa chỉ IP lưu trữ không? –
Không, đó là một IP đầy đủ của x chỉ là mặt nạ. –
Kk. Có lẽ vấn đề tường lửa? Bạn có thể xác thực kết nối mạng trên cổng 8080 bằng netcat không? –