2016-06-16 25 views
12

Cho hai Spark Datasets, A và B tôi có thể làm một tham gia vào cột đơn như sau:Cách tham gia Tập dữ liệu trên nhiều cột?

a.joinWith(b, $"a.col" === $"b.col", "left") 

Câu hỏi của tôi là liệu bạn có thể làm một tham gia sử dụng nhiều cột. Về cơ bản tương đương với DataFrames đang api sau:

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left") 

Trả lời

12

Bạn có thể làm điều đó một cách chính xác theo cùng một cách như với Dataframe:

val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS 
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS 

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show 
// +------------+-----------+ 
// |   _1|   _2| 
// +------------+-----------+ 
// | [a,foo,2.0]|[a,foo,2.0]| 
// |[x,bar,-1.0]|  null| 
// +------------+-----------+ 

Trong Spark < 2.0.0 bạn có thể sử dụng một cái gì đó như thế này :

xs.as("xs").joinWith(
    ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left") 
7

Có một cách khác để nối bằng cách chuỗi where cái khác. Trước tiên, bạn chỉ định một tham gia (và tùy chọn loại của nó) tiếp theo là where điều hành (s), tức là

scala> case class A(id: Long, name: String) 
defined class A 

scala> case class B(id: Long, name: String) 
defined class B 

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS 
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string] 

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS 
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string] 

scala> as.join(bs).where(as("id") === bs("id")).show 
+---+----+---+-----+ 
| id|name| id| name| 
+---+----+---+-----+ 
| 0|zero| 0| zero| 
| 1| one| 1|jeden| 
+---+----+---+-----+ 


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show 
+---+----+---+----+ 
| id|name| id|name| 
+---+----+---+----+ 
| 0|zero| 0|zero| 
+---+----+---+----+ 

Lý do cho một goodie như vậy là tôi ưu hoa Spark sẽ tham gia (không có ý định chơi chữ) liên tiếp where s thành một với join. Sử dụng toán tử explain để xem các kế hoạch logic và vật lý cơ bản.

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true) 
== Parsed Logical Plan == 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Analyzed Logical Plan == 
id: bigint, name: string, id: bigint, name: string 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Optimized Logical Plan == 
Join Inner, ((name#31 = name#36) && (id#30L = id#35L)) 
:- Filter isnotnull(name#31) 
: +- LocalRelation [id#30L, name#31] 
+- Filter isnotnull(name#36) 
    +- LocalRelation [id#35L, name#36] 

== Physical Plan == 
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight 
:- *Filter isnotnull(name#31) 
: +- LocalTableScan [id#30L, name#31] 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false])) 
    +- *Filter isnotnull(name#36) 
     +- LocalTableScan [id#35L, name#36] 
Các vấn đề liên quan