2016-04-06 26 views
13

Tôi đang cố gắng sử dụng API Spark Dataset nhưng tôi đang gặp phải một số sự cố khi tham gia đơn giản.API tập dữ liệu Spark - tham gia

Hãy nói rằng tôi có hai bộ dữ liệu với các lĩnh vực: date | value, sau đó trong trường hợp DataFrame my tham gia sẽ như thế nào:

val dfA : DataFrame 
val dfB : DataFrame 

dfA.join(dfB, dfB("date") === dfA("date")) 

Tuy nhiên đối với Dataset có phương pháp .joinWith, nhưng phương pháp tương tự không hoạt động :

val dfA : Dataset 
val dfB : Dataset 

dfA.joinWith(dfB, ?) 

Đối số được yêu cầu bởi .joinWith là gì?

Trả lời

19

Để sử dụng joinWith, trước tiên bạn phải tạo DataSet và rất có thể là hai trong số đó. Để tạo một DataSet, bạn cần phải tạo một lớp chữ thường khớp với giản đồ của bạn và gọi DataFrame.as[T] trong đó T là lớp chữ hoa của bạn. Vì vậy:

case class KeyValue(key: Int, value: String) 
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") 
val ds = df.as[KeyValue] 
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

Bạn cũng có thể bỏ qua lớp trường hợp và sử dụng một tuple:

val tupDs = df.as[(Int,String)] 
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

Sau đó, nếu bạn đã có một trường hợp lớp/DF, như thế này nói:

case class Nums(key: Int, num1: Double, num2: Long) 
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") 
val ds2 = df2.as[Nums] 
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

Sau đó, , trong khi cú pháp của joinjoinWith tương tự nhau, kết quả khác nhau:

df.join(df2, df.col("key") === df2.col("key")).show 
// +---+-----+---+----+----+ 
// |key|value|key|num1|num2| 
// +---+-----+---+----+----+ 
// | 1| asdf| 1| 7.7| 101| 
// | 2|34234| 2| 1.2| 10| 
// +---+-----+---+----+----+ 

ds.joinWith(ds2, df.col("key") === df2.col("key")).show 
// +---------+-----------+ 
// |  _1|   _2| 
// +---------+-----------+ 
// | [1,asdf]|[1,7.7,101]| 
// |[2,34234]| [2,1.2,10]| 
// +---------+-----------+ 

Như bạn có thể thấy, joinWith để nguyên các đối tượng còn nguyên vẹn như một phần của bộ túp, trong khi join làm phẳng các cột thành một không gian tên duy nhất. (. Mà sẽ gây ra vấn đề trong trường hợp trên, vì tên cột "chìa khóa" được lặp lại)

Tò mò đủ, tôi phải sử dụng df.col("key")df2.col("key") để tạo điều kiện cho tham gia dsds2 - nếu bạn sử dụng chỉ col("key") ở hai bên nó không hoạt động, và ds.col(...) không tồn tại. Tuy nhiên, sử dụng thủ thuật gốc df.col("key").

+3

chi tiết lời giải thích. Chỉ một sự nhầm lẫn. Có cách nào tốt hơn để viết điều kiện tham gia đã nhập không. ví dụ: df.col ("key") chúng ta có thể có thứ gì đó an toàn hơn có thể giải quyết tính chính xác của "key" tại thời gian biên dịch. –

+5

Tôi hoàn toàn đồng ý, dựa trên cú pháp này không có sử dụng trong việc tạo Tập dữ liệu, vậy lợi ích ở đâu? Tôi không thể vượt qua thực tế là không có thay thế được đánh máy .. Thật đáng tiếc! – Sparky

2

Trong ví dụ ở trên, bạn có thể thử dưới đây tùy chọn -

  • Định nghĩa một lớp hợp cho đầu ra của bạn

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Tham gia hai Bộ dữ liệu với "Seq (" khóa ")", điều này sẽ giúp bạn tránh hai cột khóa trùng lặp trong đầu ra.Mà sẽ giúp đỡ để áp dụng các lớp trường hợp hoặc lấy dữ liệu ở bước tiếp theo

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

+0

bạn không trả lời cụ thể câu hỏi, nhưng mẹo ("chìa khóa") của Seq đã giúp tôi – ImDarrenG

Các vấn đề liên quan