2015-02-04 27 views
28

Tôi có thể truy vấn RDD bằng các loại phức tạp như bản đồ/mảng như thế nào? ví dụ, khi tôi đang viết mã kiểm tra này:Truy vấn Spark SQL DataFrame với các loại phức tạp

case class Test(name: String, map: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 

tôi mặc dù cú pháp sẽ là một cái gì đó như:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world") 

hoặc

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world") 

nhưng tôi nhận

Can't access nested field in type MapType(StringType,StringType,true)

a nd

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes

tương ứng.

+1

Làm thế nào để chấp nhận * tome * của câu trả lời từ @ zero323? – javadba

Trả lời

79

Nó phụ thuộc vào một loại cột.Hãy bắt đầu với một số dữ liệu giả:

import org.apache.spark.sql.functions.{udf, lit} 
import scala.util.Try 

case class SubRecord(x: Int) 
case class ArrayElement(foo: String, bar: Int, vals: Array[Double]) 
case class Record(
    an_array: Array[Int], a_map: Map[String, String], 
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement]) 


val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1), 
     Array(
      ArrayElement("foo", 1, Array(1.0, 2.0)), 
      ArrayElement("bar", 2, Array(3.0, 4.0)))), 
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2), 
     Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
       ArrayElement("baz", 4, Array(7.0, 8.0)))) 
)).toDF 
df.registerTempTable("df") 
df.printSchema 

// root 
// |-- an_array: array (nullable = true) 
// | |-- element: integer (containsNull = false) 
// |-- a_map: map (nullable = true) 
// | |-- key: string 
// | |-- value: string (valueContainsNull = true) 
// |-- a_struct: struct (nullable = true) 
// | |-- x: integer (nullable = false) 
// |-- an_array_of_structs: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- foo: string (nullable = true) 
// | | |-- bar: integer (nullable = false) 
// | | |-- vals: array (nullable = true) 
// | | | |-- element: double (containsNull = false) 
  • cột mảng:

    • Column.getItem phương pháp

      df.select($"an_array".getItem(1)).show 
      
      // +-----------+ 
      // |an_array[1]| 
      // +-----------+ 
      // |   2| 
      // |   5| 
      // +-----------+ 
      
    • Hive Brac kets cú pháp:

      sqlContext.sql("SELECT an_array[1] FROM df").show 
      
      // +---+ 
      // |_c0| 
      // +---+ 
      // | 2| 
      // | 5| 
      // +---+ 
      
    • một UDF

      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) 
      
      df.select(get_ith($"an_array", lit(1))).show 
      
      // +---------------+ 
      // |UDF(an_array,1)| 
      // +---------------+ 
      // |    2| 
      // |    5| 
      // +---------------+ 
      
  • cột đồ

    • sử dụng Column.getField phương pháp:

      df.select($"a_map".getField("foo")).show 
      
      // +----------+ 
      // |a_map[foo]| 
      // +----------+ 
      // |  bar| 
      // |  null| 
      // +----------+ 
      
    • sử dụng Hive ngoặc cú pháp:

      sqlContext.sql("SELECT a_map['foz'] FROM df").show 
      
      // +----+ 
      // | _c0| 
      // +----+ 
      // |null| 
      // | baz| 
      // +----+ 
      
    • sử dụng một đường dẫn đầy đủ với dấu chấm cú pháp:

      df.select($"a_map.foo").show 
      
      // +----+ 
      // | foo| 
      // +----+ 
      // | bar| 
      // |null| 
      // +----+ 
      
    • sử dụng một UDF

      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k)) 
      
      df.select(get_field($"a_map", lit("foo"))).show 
      
      // +--------------+ 
      // |UDF(a_map,foo)| 
      // +--------------+ 
      // |   bar| 
      // |   null| 
      // +--------------+ 
      
  • cột struct sử dụng đầy đủ cú pháp đường dẫn có dấu chấm:

    • với DataFrame API

      df.select($"a_struct.x").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
    • với SQL nguyên

      sqlContext.sql("SELECT a_struct.x FROM df").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
  • lĩnh vực bên trong mảng của structs có thể được truy cập bằng dot-cú pháp, tên và tiêu chuẩn Column phương pháp :

    df.select($"an_array_of_structs.foo").show 
    
    // +----------+ 
    // |  foo| 
    // +----------+ 
    // |[foo, bar]| 
    // |[foz, baz]| 
    // +----------+ 
    
    sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show 
    
    // +---+ 
    // |_c0| 
    // +---+ 
    // |foo| 
    // |foz| 
    // +---+ 
    
    df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show 
    
    // +------------------------------+ 
    // |an_array_of_structs.vals[1][1]| 
    // +------------------------------+ 
    // |       4.0| 
    // |       8.0| 
    // +------------------------------+ 
    
  • trường loại người dùng được xác định (UDT) có thể được truy cập bằng cách sử dụng UDF. Xem SparkSQL referencing attributes of UDT để biết chi tiết.

Ghi chú:

  • tùy thuộc vào một phiên bản Spark một số trong những phương pháp có thể được cung cấp chỉ với HiveContext. UDF nên hoạt động độc lập với phiên bản với cả hai tiêu chuẩn SQLContextHiveContext.
  • nói chung giá trị lồng nhau là công dân hạng hai. Không phải tất cả các hoạt động điển hình đều được hỗ trợ trên các trường lồng nhau. Tùy thuộc vào một bối cảnh nó có thể là tốt hơn để san bằng giản đồ và/hoặc nổ tung bộ sưu tập

    df.select(explode($"an_array_of_structs")).show 
    
    // +--------------------+ 
    // |     col| 
    // +--------------------+ 
    // |[foo,1,WrappedArr...| 
    // |[bar,2,WrappedArr...| 
    // |[foz,3,WrappedArr...| 
    // |[baz,4,WrappedArr...| 
    // +--------------------+ 
    
  • Dot cú pháp có thể được kết hợp với ký tự đại diện (*) để chọn (có thể nhiều) lĩnh vực mà không chỉ định tên rõ ràng:

    df.select($"a_struct.*").show 
    // +---+ 
    // | x| 
    // +---+ 
    // | 1| 
    // | 2| 
    // +---+ 
    
+5

bạn có thể cung cấp một số chi tiết không? lol –

+0

Có thể lấy tất cả các phần tử trong một mảng cấu trúc không? Là một cái gì đó như thế này có thể .. sqlContext.sql ("SELECT an_array_of_structs [0] .foo FROM df"). Hiển thị – user1384205

+1

Đây phải là câu trả lời được chấp nhận. –

2

Khi Bạn chuyển nó sang DF, u chỉ có thể lấy dữ liệu như

val rddRow= rdd.map(kv=>{ 
    val k = kv._1 
    val v = kv._2 
    Row(k, v) 
    }) 

val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true) 
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true) 
val arr = Array(myFld1, myFld2) 
val schema = StructType(arr) 
val rowrddDF = sqc.createDataFrame(rddRow, schema) 
rowrddDF.registerTempTable("rowtbl") 
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one")) 
or 
val rowrddDFFinal = rowrddDF.select("map.one") 
+0

khi tôi thử điều này, tôi nhận được 'lỗi: giá trị _1 không phải là thành viên của org.apache.spark.sql.Row' – Paul

0

đây là những gì tôi đã làm và nó làm việc

case class Test(name: String, m: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 
val rdddf = rdd.toDF 
rdddf.registerTempTable("mytable") 
sqlContext.sql("select m.hello from mytable").show 

Kết quả

+------+ 
| hello| 
+------+ 
| world| 
|people| 
+------+ 
Các vấn đề liên quan