2014-11-18 16 views
5

Trong tìm kiếm làm thế nào để tạo ra một chủ đề Kafka qua API, tôi thấy ví dụ này trong Scala:Cách tạo Kafka ZKStringSerializer trong Java?

import kafka.admin.AdminUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.ZkClient 

// Create a ZooKeeper client 
val sessionTimeoutMs = 10000 
val connectionTimeoutMs = 10000 
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, 
          connectionTimeoutMs, ZKStringSerializer) 

// Create a topic with 8 partitions and a replication factor of 3 
val topicName = "myTopic" 
val numPartitions = 8 
val replicationFactor = 3 
val topicConfig = new Properties 
AdminUtils.createTopic(zkClient, topicName, 
         numPartitions, replicationFactor, topicConfig) 

Nguồn: https://stackoverflow.com/a/23360100/871012

Các arg cuối cùng ZKStringSerializer rõ ràng là một đối tượng Scala. Nó không phải là rõ ràng với tôi làm thế nào để làm cho ví dụ này làm việc trong Java.

bài này How to create a scala object in clojure hỏi cùng một câu hỏi trong Clojure và câu trả lời là:

ZKStringSerializer$/MODULE$ 

mà trong Java sẽ (tôi nghĩ) dịch để:

ZKStringSerializer$.MODULE$ 

Nhưng khi tôi cố gắng đó (hoặc bất kỳ số lượng biến thể nào khác) không có biến thể nào trong số đó được biên dịch.
Các lỗi biên dịch là:

KafkaTopicCreator.java:[16,18] cannot find symbol 
symbol: variable ZKStringSerializer$ 
location: class org.sample.KafkaTopicCreator 

Tôi đang sử dụng kafka_2.9.2-0.8.1.1 và Java 8.

Trả lời

17

Đối với java thử những điều sau đây,

nhập khẩu đầu tiên dưới đây tuyên bố

import kafka.utils.ZKStringSerializer$; 

Tạo đối tượng cho ZkClient theo cách sau,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181" 
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$); 
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

Đoạn mã trên sẽ không làm việc cho Kafka> 0.9 kể từ khi api đã được thay đổi, Sử dụng mã dưới đây để biết Kafka> 0,9

import java.util.Properties; 
import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

public class KafkaTopicCreationInJava 
{ 
    public static void main(String[] args) throws Exception { 
     ZkClient zkClient = null; 
     ZkUtils zkUtils = null; 
     try { 
      String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; 
      int sessionTimeOutInMs = 15 * 1000; // 15 secs 
      int connectionTimeOutInMs = 10 * 1000; // 10 secs 

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

      String topicName = "testTopic"; 
      int noOfPartitions = 2; 
      int noOfReplication = 3; 
      Properties topicConfiguration = new Properties(); 

      AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       zkClient.close(); 
      } 
     } 
    } 
} 
Các vấn đề liên quan