2017-01-25 28 views
7

tôi cố gắng phân vùng lại một DataFrame theo một columnm các các DataFrame có N (hãy nói N=3) giá trị khác nhau trong phân vùng cột x, ví dụ:Thả phân vùng DataFrame trống trong Apache Spark

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data 

Những gì tôi muốn đạt được là repartiton myDF bởi x mà không tạo phân vùng trống. Có cách nào tốt hơn làm việc này không?

val numParts = myDF.select($"x").distinct().count.toInt 
myDF.repartition(numParts,$"x") 

(Nếu tôi không chỉ định numParts trong repartiton, hầu hết các phân vùng của tôi là rỗng (như repartition tạo ra 200 phân vùng) ...)

+1

Theo http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options, 200 phân vùng sẽ được tạo vì giá trị mặc định cho tùy chọn cấu hình 'tia lửa .sql.shuffle.partitions' – AKSW

+1

Trả lời có thể tìm thấy http://stackoverflow.com/questions/41854818/spark-dataframe-repartition-number-of-partition-not-preserved?noredirect=1#comment70893687_41854818 – FaigB

Trả lời

2

Tôi muốn nghĩ về giải pháp với iterating qua df phân vùng và tìm nạp số bản ghi trong đó để tìm các phân vùng không trống.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

Như chúng tôi đã nhận phân vùng không rỗng (nonEmptyPart), chúng tôi có thể làm sạch các phân vùng trống bằng cách sử dụng coalesce() (check coalesce() vs reparation()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type 

Nó có thể hoặc có thể không phải là tốt nhất, nhưng giải pháp này sẽ tránh xáo trộn như chúng ta không sử dụng reparation()


Ví dụ để giải quyết nhận xét

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x") 
val nonEmptyPart = sc.longAccumulator("nonEmptyPart") 

df1.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

val finalDf = df1.coalesce(nonEmptyPart.value.toInt) 

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}") 
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}") 
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}") 

Output

nonEmptyPart => 3 
df.rdd.partitions.length => 200 
finalDf.rdd.partitions.length => 3 
+0

'val df = sc.parallelize (Seq (1,1,2,2,3,3)) toDF ("x") phân chia lại (10, $ "x") ‌ .coalesce (3) '. Bây giờ nó thu hẹp số phân vùng từ 10 đến 3. – mrsrinivas

+0

và bây giờ làm 'finalDf.foreachPartition (p => println (p.size))'. Tôi nhận được '0 0 6', tức là 2 phân vùng trống, 1 chứa tất cả các hàng. Đó không phải những gì tôi muốn (tôi là Spark 1.6.3) –

+0

Nó có thể là do xáo trộn vô hiệu hóa với 'coalesce'. Hãy thử sử dụng 'repartition', nó sẽ trộn tất cả dữ liệu theo' HashPartitioner'. vì vậy sẽ có một cơ hội của mỗi phân vùng sẽ được lấp đầy với một số dữ liệu. Nếu bạn thực sự nghiêm túc về việc loại bỏ các phân vùng trống, bạn có thể cần chạy nó (** tìm các phân vùng không trống và áp dụng coalesce/repartition **) iterativly. – mrsrinivas

Các vấn đề liên quan