2017-01-18 33 views
6

Làm cách nào để thêm hoặc thay thế trường vào cấu trúc trên bất kỳ cấp độ lồng nhau nào?Thêm cột lồng vào Spark DataFrame

đầu vào này:

val rdd = sc.parallelize(Seq(
    """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""", 
    """{"a": {"xX": 3},"b": {"z": 0}}""", 
    """{"a": {"XX": 3},"b": {"z": 0}}""", 
    """{"a": {"xx": 4},"b": {"z": 0}}""")) 
var df = sqlContext.read.json(rdd) 

sản lượng schema sau:

root 
|-- a: struct (nullable = true) 
| |-- XX: long (nullable = true) 
| |-- xX: long (nullable = true) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Sau đó, tôi có thể làm điều này:

import org.apache.spark.sql.functions._ 
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withColumn("a_xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 
    .dropNestedColumn("a.xx") 

(dropNestedColumn được vay mượn từ câu trả lời này: https://stackoverflow.com/a/39943812/1068385. Tôi về cơ bản đang tìm kiếm operati ngược trên đó)

Và schema trở thành:.

root 
|-- a: struct (nullable = false) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 
|-- a_xx: long (nullable = true) 

Rõ ràng nó không thay thế (hoặc thêm) a.xx nhưng thay vào đó nó sẽ thêm lĩnh vực mới a_xx trên mức root.

Tôi muốn để có thể làm điều này thay vì:

val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withNestedColumn("a.xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 

Vì vậy mà nó sẽ cho kết quả trong sơ đồ này:

root 
|-- a: struct (nullable = false) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Làm thế nào tôi có thể đạt được điều đó?

Mục tiêu thực tế ở đây là phân biệt chữ hoa chữ thường với tên cột trong JSON đầu vào. Bước cuối cùng sẽ đơn giản: thu thập tất cả các tên cột trùng nhau và áp dụng kết hợp trên mỗi cột.

+0

bạn đã có được giải pháp? –

+0

@ShankarKoirala: không phải với Spark. Trong Hive nó là tầm thường để sử dụng COALESCE để đạt được những gì tôi muốn. – Arvidaa

Trả lời

1

Nó có thể không được như tao nhã hoặc hiệu quả như nó có thể được nhưng đây là những gì tôi đã đưa ra:

object DataFrameUtils { 
    private def nullableCol(parentCol: Column, c: Column): Column = { 
    when(parentCol.isNotNull, c) 
    } 

    private def nullableCol(c: Column): Column = { 
    nullableCol(c, c) 
    } 

    private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = { 
    splitted 
     .foldRight(newCol) { 
     case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName)) 
     } 
    } 

    private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = { 
    colType match { 
     case colType: StructType if splitted.nonEmpty => { 
     var modifiedFields: Seq[(String, Column)] = colType.fields 
      .map(f => { 
      var curCol = col.getField(f.name) 
      if (f.name == splitted.head) { 
       curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol) 
      } 
      (f.name, curCol as f.name) 
      }) 

     if (!modifiedFields.exists(_._1 == splitted.head)) { 
      modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head) 
     } 

     var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*) 
     if (nullable) { 
      modifiedStruct = nullableCol(col, modifiedStruct) 
     } 
     modifiedStruct 
     } 
     case _ => createNestedStructs(splitted, newCol) 
    } 
    } 

    private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = { 
    if (newColName.contains('.')) { 
     var splitted = newColName.split('.') 

     val modifiedOrAdded: (String, Column) = df.schema.fields 
     .find(_.name == splitted.head) 
     .map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol))) 
     .getOrElse { 
      (splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head) 
     } 

     df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2) 

    } else { 
     // Top level addition, use spark method as-is 
     df.withColumn(newColName, newCol) 
    } 
    } 

    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Add nested field to DataFrame 
     * 
     * @param newColName Dot-separated nested field name 
     * @param newCol New column value 
     */ 
    def withNestedColumn(newColName: String, newCol: Column): DataFrame = { 
     DataFrameUtils.addNestedColumn(df, newColName, newCol) 
    } 
    } 
} 

Hãy thoải mái để cải thiện nó.

val data = spark.sparkContext.parallelize(List("""{ "a1": 1, "a3": { "b1": 3, "b2": { "c1": 5, "c2": 6 } } }""")) 
val df: DataFrame = spark.read.json(data) 

val df2 = df.withNestedColumn("a3.b2.c3.d1", $"a3.b2") 

nên sản xuất:

assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString) 
+0

Cảm ơn. Tôi sẽ xác minh nó vào tuần tới và nếu nó hoạt động đánh dấu là câu trả lời được chấp nhận. – Arvidaa

+0

@Michel Lemay Nó hoạt động tốt cho trường hợp trong câu hỏi. Cảm ơn. Tôi đang cố gắng áp dụng nó vào một mảng cấu trúc lồng nhau và nó không thành công, nó hơi quá xa đối với kiến ​​thức tia lửa thực sự của tôi ... bạn có thể giúp tôi không? – Gab

+0

Thật vậy, nó không phải là một tính năng chúng tôi cần vì vậy tôi để lại nó cho những cải tiến trong tương lai. Để hỗ trợ điều này bằng cách sử dụng mã hiện tại, người ta sẽ phải sửa đổi 'trường hợp _' và mảng hỗ trợ của các cấu trúc lồng nhau. Các kiểu đơn giản lồng nhau sẽ cần phải được nâng cấp lên cấu trúc. Ngoài ra, chúng tôi sẽ cần hỗ trợ mảng trong newCol và xử lý với số lượng phần tử có thể khác nhau trong mảng đích. –

Các vấn đề liên quan