2017-08-11 28 views
5

1) Chúng tôi đang tiêu thụ từ kafka sử dụng luồng có cấu trúc và ghi dữ liệu đã xử lý được đặt thành s3. Chúng tôi cũng muốn ghi dữ liệu đã xử lý vào kafka di chuyển về phía trước, có thể thực hiện điều đó từ cùng một truy vấn truyền trực tuyến không? Trong bản ghi, tôi thấy đầu ra tiến trình truy vấn trực tuyến và tôi có một khoảng thời gian mẫu JSON từ nhật ký, một số có thể cung cấp rõ ràng hơn về sự khác biệt giữa addBatchgetBatch?Spark Cấu trúc phát trực tuyến: nhiều bồn

3) TriggerExecution - đã đến lúc xử lý dữ liệu được tìm nạp và ghi vào bồn rửa chưa?

"durationMs" : { 
    "addBatch" : 2263426, 
    "getBatch" : 12, 
    "getOffset" : 273, 
    "queryPlanning" : 13, 
    "triggerExecution" : 2264288, 
    "walCommit" : 552 
    }, 

liên quan aravias

Trả lời

6

1) Có.

Trong Spark 2.1.1, bạn có thể sử dụng writeStream.foreach để ghi dữ liệu của mình vào Kafka. Có một ví dụ trong blog này: https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

Hoặc bạn có thể sử dụng Spark 2.2.0 bổ sung Kafka để hỗ trợ viết thư cho Kafka chính thức.

2) getBatch đo thời gian tạo DataFrame từ nguồn. Điều này thường khá nhanh. addBatch đo thời gian để chạy DataFrame trong bồn rửa chén.

3) triggerExecution đo lường thời gian chạy lệnh kích hoạt, thường gần giống như getOffset + getBatch + addBatch.

+0

Cảm ơn phản hồi, bạn có thể làm rõ điều sau đây - khi viết Tập dữ liệu được tạo từ chủ đề nguồn đến cả dấu kiểm S3 và KAFKA phải được chỉ định riêng cho từng bồn, hy vọng rằng dữ liệu sẽ được đọc hai lần từ chủ đề nguồn ngay cả khi sử dụng cùng một Số liệu được tạo từ nguồn đó để ghi vào hai bộ khuếch tán khác nhau này? – user2221654

+0

Nếu bạn có hai bồn rửa, điều đó có nghĩa là bạn có hai truy vấn. Mỗi truy vấn có người tiêu dùng Kafka riêng và lấy dữ liệu từ Kafka một cách độc lập. – zsxwing

0

Có câu hỏi liên quan đến tình huống tương tự, tôi đang cố ghi dữ liệu vào hai bồn rửa kafka. Tôi đang nhận classCastException như dưới đây. Mã trông giống như thế này

final Dataset<String> eventDataset = feedMessageDataset 
      .map(toEvent(nodeCodeToAliasBroadcast), OBSERVED_EVENT_ENCODER) 
      .map(SparkFeedReader::serializeToJson, STRING()); 
    final StreamingQuery eventQuery = kafkaStreamWriterForEvents(eventDataset, configuration, feedReaderEngineName).start(); 

    final Dataset<String> splunkEventDataset = feedMessageDataset 
      .map(toSplunkEvent(), SPLUNK_OBSERVED_EVENT_ENCODER) 
      .filter(event -> !event.getIndicatorCode().equals(HEARBEAT_INDICATOR_CODE)) 
      .map(SparkFeedReader::serializeToJson, STRING()); 

    final StreamingQuery splunkEventQuery = kafkaStreamWriterForSplunkEvents(splunkEventDataset, configuration, feedReaderEngineName).start(); 

Nếu tôi nhận xét một trong các bồn, nó hoạt động tốt. Điều này xảy ra trong tia lửa 2.2.0.

java.lang.ClassCastException: x.SplunkObservedEvent cannot be cast to x.ObservedEvent 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
Các vấn đề liên quan