2017-12-15 95 views
6

Tôi có ứng dụng phát trực tuyến Spark được viết bằng Java và sử dụng Spark 2.1. Tôi đang sử dụng KafkaUtils.createDirectStream để đọc tin nhắn từ Kafka. Tôi đang sử dụng bộ mã hóa/giải mã kryo cho tin nhắn kafka. Tôi đã chỉ định điều này trong thuộc tính Kafka-> key.deserializer, value.deserializer, key.serializer, value.deserializer

Khi Spark kéo các tin nhắn trong một lô vi mô, các thông báo được giải mã thành công bằng bộ giải mã kryo. Tuy nhiên tôi nhận thấy rằng thực thi Spark tạo ra một thể hiện mới của bộ giải mã kryo để giải mã từng thông điệp đọc từ kafka. Tôi đã kiểm tra điều này bằng cách đặt nhật ký bên trong bộ giải mã bộ giải mã

Điều này có vẻ lạ đối với tôi. Không nên cùng một bộ giải mã được sử dụng cho mỗi tin nhắn và mỗi lô?Tại sao luồng trực tiếp của Kafka tạo bộ giải mã mới cho mọi thư?

Mã nơi tôi đang đọc từ Kafka:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

Trả lời

3

Nếu chúng ta muốn xem cách Spark lấy về dữ liệu từ Kafka nội bộ, chúng tôi sẽ cần phải nhìn vào KafkaRDD.compute, mà là một phương pháp thực hiện cho mỗi RDD mà nói với khuôn khổ làm thế nào để, tốt, tính toán rằng RDD:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

Điều quan trọng ở đây là else khoản, mà tạo ra một KafkaRDDIterator. Đây nội bộ có:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

Mà như bạn thấy, tạo ra một thể hiện của cả hai bộ giải mã chủ chốt và các bộ giải mã giá trị thông qua phản ánh, đối với mỗi phân vùng cơ bản. Điều này có nghĩa là nó không được tạo ra cho mỗi tin nhắn nhưng mỗi phân vùng Kafka.

Tại sao nó được triển khai theo cách này? Tôi không biết. Tôi giả định bởi vì một bộ giải mã khóa và giá trị nên có một hiệu suất bỏ qua so với tất cả các phân bổ khác xảy ra bên trong Spark.

Nếu bạn đã định cấu hình ứng dụng của mình và nhận thấy đây là một đường dẫn phân bổ, bạn có thể mở một vấn đề. Nếu không, tôi sẽ không lo lắng về nó.

+0

Nghiên cứu rất tốt! #impressed –

+0

@Yuval: Tôi đang sử dụng Kafka 0.10.x. Spark sử dụng người dùng kafka đã lưu trong bộ nhớ cache (mỗi người thực thi), trong đó khóa bộ nhớ cache được xác định theo ID người tiêu dùng, ID chủ đề, ID phân vùng. Nó có ý nghĩa để có một bộ giải mã cho mỗi phân vùng kafka hoặc cách khác Spark sẽ giải mã các tin nhắn song song. Những gì tôi mong đợi là một bộ giải mã mới phải được tạo ra một lần cho mỗi phân vùng bên trong một người tiêu dùng lưu trữ và thats nó! Tôi không thấy vấn đề này dưới tải nhẹ nhưng chỉ khi tôi bơm 1000 tin nhắn mỗi giây. Có lẽ tôi đang chạy vào một chu kỳ "GC". Bạn có bất kỳ ý tưởng nào về cách bật tính năng ghi nhật ký trong lớp KafkaRDD không? – scorpio

+0

@scorpio Kafka 0.10.x không yêu cầu bộ giải mã. Nó trả về 'ConsumerRecord' cơ bản, và bạn chọn phải làm gì với nó. Bạn đang tạo một thể hiện của một bộ giải mã bên trong một 'bản đồ' có lẽ? –

Các vấn đề liên quan