2014-05-11 18 views
7

Tôi viết Kafka Consumer cho ứng dụng phân tán tốc độ cao. Tôi chỉ có một chủ đề nhưng tỷ lệ tin nhắn gửi đến là rất cao. Có nhiều phân vùng phục vụ nhiều người tiêu dùng hơn sẽ thích hợp cho trường hợp sử dụng này. Cách tốt nhất để tiêu thụ là có nhiều trình đọc luồng. Theo tài liệu hoặc các mẫu có sẵn, số lượng KafkaStreams mà ConsumerConnector đưa ra dựa trên số lượng chủ đề. Tự hỏi làm thế nào để có được nhiều hơn một độc giả KafkaStream [dựa trên phân vùng], để tôi có thể mở rộng một luồng trên mỗi luồng hoặc Đọc từ cùng một KafkaStream trong nhiều luồng sẽ thực hiện đọc đồng thời từ nhiều phân vùng?Apache Kafka - KafkaStream về chủ đề/phân vùng

Mọi thông tin chi tiết đều được đánh giá cao.

+0

Sử dụng SimpleConsumer không phải là một lựa chọn? –

Trả lời

14

có muốn chia sẻ những gì tôi tìm thấy từ mailing list:

Số điện thoại mà bạn vượt qua trong các điều khiển bản đồ chủ đề có bao nhiêu con suối một chủ đề được chia thành. Trong trường hợp của bạn, nếu bạn vượt qua 1, tất cả 10 phân vùng dữ liệu sẽ được đưa vào 1 dòng. Nếu bạn vượt qua 2, mỗi luồng trong 2 luồng sẽ nhận dữ liệu từ 5 phân vùng. Nếu bạn vượt qua trong 11, 10 người trong số họ sẽ nhận được dữ liệu từ 1 phân vùng và 1 luồng sẽ không nhận được gì.

Thông thường, bạn cần phải lặp lại từng luồng theo chuỗi riêng của mình. Điều này là do mỗi luồng có thể chặn vĩnh viễn nếu không có sự kiện mới.

đoạn

mẫu:

topicCount.put(msgTopic, new Integer(partitionCount)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount); 
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic); 

for (final KafkaStream stream : streams) { 
    ReadTask task = new ReadTask(stream, msgTopic); 
    task.addObserver(this.msgObserver); 
    tasks.add(task); executor.submit(task); 
} 

tham khảo: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%[email protected].com%3E

+0

Đoạn mã mẫu topicCount.put (msgTopic, new Integer (partitionCount)); Bản đồ >> consumerStreams = connector.createMessageStreams (topicCount); Danh sách > luồng = consumerStreams.get (msgTopic); cho (luồng KafkaStream cuối cùng: luồng) { Tác vụ ReadTask = ReadTask mới (luồng, msgTopic); task.addObserver (this.msgObserver); tasks.add (nhiệm vụ); executor.submit (nhiệm vụ); } –

3

Cách đề nghị để làm điều này là phải có một hồ bơi thread để Java có thể xử lý tổ chức cho bạn và cho mỗi dòng phương pháp createMessageStreamsByFilter mang đến cho bạn tiêu thụ nó trong một Runnable. Ví dụ:

int NUMBER_OF_PARTITIONS = 6; 
Properties consumerConfig = new Properties(); 
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181"); 
consumerConfig.put("backoff.increment.ms", "100"); 
consumerConfig.put("autooffset.reset", "largest"); 
consumerConfig.put("groupid", "java-consumer-example"); 
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); 

TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); 
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS); 

ExecutorService executor = Executors.newFixedThreadPool(streams.size()); 
for(final KafkaStream<Message> stream: streams){ 
    executor.submit(new Runnable() { 
     public void run() { 
      for (MessageAndMetadata<Message> msgAndMetadata: stream) { 
       ByteBuffer buffer = msgAndMetadata.message().payload(); 
       byte [] bytes = new byte[buffer.remaining()]; 
       buffer.get(bytes); 
       //Do something with the bytes you just got off Kafka. 
      } 
     } 
    }); 
} 

Trong ví dụ này, tôi hỏi 6 chủ đề cho mỗi chủ đề và tôi liệt kê hai chủ đề trong danh sách trắng của mình. Một khi chúng ta có các chốt của các luồng đến, chúng ta có thể lặp qua nội dung của chúng, đó là các đối tượng MessageAndMetadata. Siêu dữ liệu thực sự chỉ là tên chủ đề và bù đắp. Khi bạn phát hiện ra bạn có thể làm điều đó trong một chuỗi nếu bạn yêu cầu 1 luồng thay vì, trong ví dụ 6 của tôi, nhưng nếu bạn yêu cầu xử lý song song, cách tốt nhất là khởi chạy trình thực thi với một luồng cho mỗi luồng được trả về.

+0

Điều gì sẽ xảy ra nếu tôi làm điều này? kafkaConsumerConfig = new ConsumerConfig (...); consumerConnector = Consumer.createJavaConsumerConnector (kafkaConsumerConfig); topicCountMap.put ("mytopic", 1); consumerMap.get ("mytopic") nhận được (0); Kiểm tra có được (0) trên danh sách các dòng kafka vì vậy tôi nhận được chỉ 1 dòng. Điều gì sẽ xảy ra nếu tôi gọi người tiêu dùng.createJavaConsumerConnector 10 lần? – stewenson

+0

Họ sẽ chia sẻ cùng một cấu hình và mỗi người sẽ đọc tất cả các phân vùng để tôi đoán bạn sẽ nhận được 10 người tiêu dùng, tất cả đều cố gắng lưu trạng thái của họ trong cùng một nút ZK để bạn kết thúc với người tiêu dùng 1 ví dụ đọc tin nhắn 1K đầu tiên, sau đó người tiêu dùng 2 đọc cùng một thông điệp 1K nhưng có khả năng người tiêu dùng 1 sẽ đọc xong cập nhật hàng loạt ZK, đọc một giây, sau đó viết vị trí của nó cho ZK, sau đó thứ hai của chúng tôi cho một số lý do chậm hơn đi vào và viết vị trí của nó quay lại ZK khiến người tiêu dùng đầu tiên xử lý lại lô thứ hai. Về cơ bản xung đột galore. – feldoh

0
/** 
* @param source : source kStream to sink output-topic 
*/ 
private static void pipe(KStream<String, String> source) { 
    source.to(Serdes.String(), Serdes.String(), new StreamPartitioner<String, String>() { 

     @Override 
     public Integer partition(String arg0, String arg1, int arg2) { 
      return 0; 
     } 
    }, "output-topic"); 
} 

trên mã sẽ viết kỷ lục tại partition 1 của tên chủ đề "đầu ra-chủ đề"

Các vấn đề liên quan