2017-04-18 15 views
6

Tôi có hai dataframes A và B. A là lớn (100 G) và B là tương đối nhỏ (100 M). Số phân vùng của A là 8 và Số phân vùng của B là 1.Làm thế nào để cải thiện phát sóng Tham gia tốc độ trong Spark

A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer") 

Tốc độ khá chậm (> 10 giờ).

Nhưng nếu tôi thay đổi điều kiện tham gia vào:

A.join(broadcast(B), $"cur" === $"low" , "left_outer") 

Nó trở nên vô cùng nhanh (< 30 phút). Nhưng điều kiện không thể thay đổi.

Vậy có cách nào để cải thiện hơn nữa tốc độ kết nối trên điều kiện tham gia ban đầu của tôi không?

+0

Chúng là loại "Dài" – derek

Trả lời

7

Bí quyết là viết lại join điều kiện để nó chứa thành phần = có thể được sử dụng để tối ưu hóa truy vấn và thu hẹp các kết quả phù hợp có thể. Đối với các giá trị số, bạn sẽ nhóm dữ liệu của mình và sử dụng nhóm cho điều kiện tham gia.

Hãy nói rằng dữ liệu của bạn trông như thế này:

val a = spark.range(100000) 
    .withColumn("cur", (rand(1) * 1000).cast("bigint")) 

val b = spark.range(100) 
    .withColumn("low", (rand(42) * 1000).cast("bigint")) 
    .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint")) 

Đầu tiên chọn một kích thước thùng thích hợp cho dữ liệu của bạn. Trong trường hợp này chúng ta có thể sử dụng 50:

val bucketSize = 50L 

xô Gán cho mỗi hàng từ a:

val aBucketed = a.withColumn(
    "bucket", ($"cur"/bucketSize).cast("bigint") * bucketSize 
) 

Tạo UDF mà sẽ phát ra xô cho một phạm vi:

def get_buckets(bucketSize: Long) = 
    udf((low: Long, high: Long) => { 
    val min = (low/bucketSize) * bucketSize 
    val max = (high/bucketSize) * bucketSize 
    (min to max by bucketSize).toSeq 
    }) 

và xô b :

val bBucketed = b.withColumn(
    "bucket", explode(get_buckets(bucketSize)($"low", $"high")) 
) 

xô sử dụng trong join điều kiện:

aBucketed.join(
    broadcast(bBucketed), 
    aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" && 
    $"cur" <= $"high", 
    "leftouter" 
) 

cách Spark này sẽ sử dụng BroadcastHashJoin:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double)/50.0) as bigint) * 50) AS bucket#184L] 
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
:  +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false])) 
    +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L] 
     +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
      +- *Range (0, 100, step=1, splits=Some(8)) 

thay vì BroadcastNestedLoopJoin:

== Physical Plan == 
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L)) 
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L] 
: +- *Range (0, 100000, step=1, splits=Some(8)) 
+- BroadcastExchange IdentityBroadcastMode 
    +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L] 
     +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L] 
     +- *Range (0, 100, step=1, splits=Some(8)) 

Bạn có thể kích thước điều chỉnh xô để cân bằng giữa độ chính xác và kích thước dữ liệu.

Nếu bạn không quan tâm đến giải pháp cấp thấp hơn thì broadcast một chuỗi được sắp xếp có quyền truy cập mục không đổi (như Array hoặc Vector) và sử dụng udf.

Bạn cũng nên xem số lượng phân vùng. 8 phân vùng cho 100GB có vẻ khá thấp.

Các vấn đề liên quan