Bí quyết là viết lại join
điều kiện để nó chứa thành phần =
có thể được sử dụng để tối ưu hóa truy vấn và thu hẹp các kết quả phù hợp có thể. Đối với các giá trị số, bạn sẽ nhóm dữ liệu của mình và sử dụng nhóm cho điều kiện tham gia.
Hãy nói rằng dữ liệu của bạn trông như thế này:
val a = spark.range(100000)
.withColumn("cur", (rand(1) * 1000).cast("bigint"))
val b = spark.range(100)
.withColumn("low", (rand(42) * 1000).cast("bigint"))
.withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))
Đầu tiên chọn một kích thước thùng thích hợp cho dữ liệu của bạn. Trong trường hợp này chúng ta có thể sử dụng 50:
val bucketSize = 50L
xô Gán cho mỗi hàng từ a
:
val aBucketed = a.withColumn(
"bucket", ($"cur"/bucketSize).cast("bigint") * bucketSize
)
Tạo UDF mà sẽ phát ra xô cho một phạm vi:
def get_buckets(bucketSize: Long) =
udf((low: Long, high: Long) => {
val min = (low/bucketSize) * bucketSize
val max = (high/bucketSize) * bucketSize
(min to max by bucketSize).toSeq
})
và xô b
:
val bBucketed = b.withColumn(
"bucket", explode(get_buckets(bucketSize)($"low", $"high"))
)
xô sử dụng trong join
điều kiện:
aBucketed.join(
broadcast(bBucketed),
aBucketed("bucket") === bBucketed("bucket") &&
$"cur" >= $"low" &&
$"cur" <= $"high",
"leftouter"
)
cách Spark này sẽ sử dụng BroadcastHashJoin
:
*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double)/50.0) as bigint) * 50) AS bucket#184L]
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
+- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
thay vì BroadcastNestedLoopJoin
:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
Bạn có thể kích thước điều chỉnh xô để cân bằng giữa độ chính xác và kích thước dữ liệu.
Nếu bạn không quan tâm đến giải pháp cấp thấp hơn thì broadcast
một chuỗi được sắp xếp có quyền truy cập mục không đổi (như Array
hoặc Vector
) và sử dụng udf
.
Bạn cũng nên xem số lượng phân vùng. 8 phân vùng cho 100GB có vẻ khá thấp.
Chúng là loại "Dài" – derek