5

Tôi đang sử dụng Spark 2.2 và tôi đang cố gắng để đọc các thông điệp JSON từ Kafka, biến đổi họ DataFrame và có chúng như là một :jsontostructs để Row trong spark cấu trúc trực tuyến

spark 
    .readStream() 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("subscribe", "topic") 
    .load() 
    .select(col("value").cast(StringType).as("col")) 
    .writeStream() 
    .format("console") 
    .start(); 

với I này có thể đạt được:

+--------------------+ 
|     col| 
+--------------------+ 
|{"myField":"somet...| 
+--------------------+ 

tôi muốn một cái gì đó như thế này:

+--------------------+ 
|    myField| 
+--------------------+ 
|"something"   | 
+--------------------+ 

Tôi cố gắng để sử dụng from_json chức năng sử dụng struct:

DataTypes.createStructType(
    new StructField[] { 
      DataTypes.createStructField("myField", DataTypes.StringType) 
    } 
) 

nhưng tôi chỉ nhận được:

+--------------------+ 
| jsontostructs(col)| 
+--------------------+ 
|[something]   | 
+--------------------+ 

sau đó tôi cố gắng sử dụng explode nhưng tôi chỉ có ngoại lệ nói:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(... 

Bất kỳ ý tưởng làm thế nào để thực hiện công việc này?

Trả lời

4

Bạn sắp hoàn tất, chỉ cần chọn đúng thứ. from_json trả lại cột struct khớp với giản đồ. Nếu schema (JSON đại diện) trông như thế này:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]} 

bạn sẽ nhận được lồng đối tượng tương đương với:

root 
|-- jsontostructs(col): struct (nullable = true) 
| |-- myField: string (nullable = false) 

Bạn có thể sử dụng getField (hoặc getItem) phương pháp để lựa chọn lĩnh vực cụ thể

df.select(from_json(col("col"), schema).getField("myField").alias("myField")); 

hoặc .* để chọn tất cả các trường cấp cao nhất trong struct:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*"); 

mặc dù đối với string cột duy nhất, get_json_object nên được nhiều hơn đủ:

df.select(get_json_object(col("col"), "$.myField")); 
Các vấn đề liên quan