2017-02-28 25 views
7

Tôi đã cố tái tạo ví dụ từ [Databricks] [1] và áp dụng nó cho trình kết nối mới với kafka và phát trực tuyến có cấu trúc tia lửa tuy nhiên tôi không thể phân tích cú pháp đúng cách bằng cách sử dụng các phương pháp out-of-the-box.Làm thế nào để sử dụng from_json với kafka kết nối 0.10 và Spark Structured Streaming?

lưu ý: chủ đề được viết thành kafka theo định dạng json.

val ds1 = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", IP + ":9092") 
      .option("zookeeper.connect", IP + ":2181") 
      .option("subscribe", TOPIC) 
      .option("startingOffsets", "earliest") 
      .option("max.poll.records", 10) 
      .option("failOnDataLoss", false) 
      .load() 

Đoạn mã dưới đây sẽ không làm việc, tôi tin rằng đó là bởi vì các json cột là một chuỗi và không khớp với chữ ký from_json phương pháp ...

val df = ds1.select($"value" cast "string" as "json") 
       .select(from_json("json") as "data") 
       .select("data.*") 

Bất cứ lời khuyên?

[CẬP NHẬT] Ví dụ làm việc: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

Trả lời

9

Trước tiên, bạn cần phải xác định các lược đồ cho thông điệp JSON của bạn. Ví dụ:

val schema = new StructType() 
    .add($"id".string) 
    .add($"name".string) 

Bây giờ bạn có thể sử dụng lược đồ này theo phương pháp from_json như dưới đây.

val df = ds1.select($"value" cast "string" as "json") 
      .select(from_json($"json", schema) as "data") 
      .select("data.*") 
+0

nếu bạn có cảnh báo trình biên dịch "value $ không phải là thành viên ..." Xin đừng quên nhập khẩu spark.implicits._ Tôi mất thêm 5-10 phút để tìm ra – user1459144

+0

cho tôi câu hỏi thư viện nào đang cung cấp hàm gọi là "from_json"? Tôi không thể có vẻ như vậy !!! Xin hãy giúp .. – Gyan

+0

@Raghav -> nhập org.apache.spark.sql.functions._ kiểm tra ví dụ tại đây: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/ master/src/main/scala-2.11/Main.scala –

Các vấn đề liên quan