2015-04-30 22 views
18

Như mọi người đều biết các trình phân vùng trong Spark có tác động hiệu suất rất lớn đối với bất kỳ hoạt động "rộng" nào, vì vậy nó thường được tùy chỉnh trong các hoạt động. Tôi đã thử nghiệm với đoạn mã sau:Trong Apache Spark, tại sao RDD.union không bảo vệ trình phân vùng?

val rdd1 = 
    sc.parallelize(1 to 50).keyBy(_ % 10) 
    .partitionBy(new HashPartitioner(10)) 
val rdd2 = 
    sc.parallelize(200 to 230).keyBy(_ % 13) 

val cogrouped = rdd1.cogroup(rdd2) 
println("cogrouped: " + cogrouped.partitioner) 

val unioned = rdd1.union(rdd2) 
println("union: " + unioned.partitioner) 

Tôi thấy rằng theo mặc định cogroup() luôn mang lại một RDD với phân vùng tùy chỉnh, nhưng union() không, nó sẽ luôn luôn quay trở lại mặc định. Điều này phản trực giác vì chúng ta thường giả định rằng một PairRDD nên sử dụng phần tử đầu tiên của nó làm khóa phân vùng. Có cách nào để "buộc" Spark để hợp nhất 2 PairRDDs để sử dụng cùng một khóa phân vùng?

Trả lời

33

union là một hoạt động rất hiệu quả vì nó không di chuyển bất kỳ dữ liệu nào xung quanh. Nếu rdd1 có 10 phân vùng và rdd2 có 20 phân vùng thì rdd1.union(rdd2) sẽ có 30 phân vùng: các phân vùng của hai RDD đặt sau nhau. Đây chỉ là một thay đổi sổ sách kế toán, không có shuffle.

Nhưng nhất thiết phải loại bỏ trình phân vùng. Một phân vùng được xây dựng cho một số phân vùng nhất định. RDD kết quả có một số phân vùng khác với cả hai số rdd1rdd2.

Sau khi kết hợp, bạn có thể chạy repartition để trộn dữ liệu và sắp xếp nó theo khóa.


Có một ngoại lệ ở trên. Nếu rdd1rdd2 có cùng một trình phân hoạch (với cùng số phân vùng), union hoạt động khác nhau. Nó sẽ nối các phân vùng của hai RDD theo cặp, cho cùng một số phân vùng như mỗi đầu vào có. Điều này có thể liên quan đến việc di chuyển dữ liệu xung quanh (nếu các phân vùng không được đặt cùng vị trí) nhưng sẽ không liên quan đến việc phát ngẫu nhiên. Trong trường hợp này, trình phân vùng được giữ lại. (Mã cho điều này là trong PartitionerAwareUnionRDD.scala.)

+4

Có thực sự là RDD công nhận phân vùng mà tôi nghĩ là được cho là được sử dụng tự động trong trường hợp phân vùng có thể được bảo toàn; không chắc tại sao nó không được áp dụng ở đây. Xem https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123 và https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala –

+0

Thật tuyệt vời! Không bao giờ biết về điều đó. Có vẻ như nó chỉ được sử dụng khi cả hai RDD có cùng một trình phân vùng. Tôi sẽ thêm nó vào câu trả lời, cảm ơn! –

+0

Cảm ơn rất nhiều! Đây là một tối ưu hóa rất quan trọng. BTW nếu điều này không phải là tối ưu cho tất cả các trường hợp tôi luôn có thể viết một zip + trong phân công đoàn anyway – tribbloid

Các vấn đề liên quan