2016-08-02 18 views
5

Tôi đang xử lý một cột số trong một DataFrame lớn, và tôi muốn tạo một cột mới lưu trữ một danh sách tổng hợp các số duy nhất xuất hiện trong cột đó.Có cách nào để truyền tham số giới hạn cho hàm.collect_set trong Spark không?

Về cơ bản chính xác những gì functions.collect_set thực hiện. Tuy nhiên, tôi chỉ cần tối đa 1000 phần tử trong danh sách tổng hợp. Có cách nào để vượt qua tham số đó bằng cách nào đó để functions.collect_set(), hoặc bất kỳ cách nào khác để có được chỉ lên đến 1000 yếu tố trong danh sách tổng hợp, mà không sử dụng một UDAF?

Vì cột quá lớn nên tôi muốn tránh thu thập tất cả các phần tử và cắt bớt danh sách sau đó.

Cảm ơn!

Trả lời

1

sử dụng mất

val firstThousand = rdd.take(1000) 

Sẽ trở lại là người đầu tiên 1000. Thu thập cũng có một chức năng bộ lọc có thể được cung cấp. Điều đó sẽ cho phép bạn cụ thể hơn với những gì được trả về.

+0

Cảm ơn câu trả lời. Tuy nhiên, 1) Tôi chỉ muốn danh sách các giá trị _distinct_. Tôi thấy có một rdd.distinct(), nhưng điều đó dường như không có một tham số giới hạn 2) Không chắc chắn làm thế nào để sử dụng một chức năng lọc trong thu thập. Làm cách nào để sử dụng bộ lọc để chỉ nhận được một số lượng giá trị nhất định? – user1500142

+0

Ngoài ra, lý tưởng tôi muốn tránh sử dụng các rdd. Tôi hiện đang giống như df.groupBy(). Agg ( user1500142

1
scala> df.show 
    +---+-----+----+--------+ 
    | C0| C1| C2|  C3| 
    +---+-----+----+--------+ 
    | 10| Name|2016| Country| 
    | 11|Name1|2016|country1| 
    | 10| Name|2016| Country| 
    | 10| Name|2016| Country| 
    | 12|Name2|2017|Country2| 
    +---+-----+----+--------+ 

scala> df.groupBy("C1").agg(sum("C0")) 
res36: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res36.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
| Name|  30| 
+-----+-------+ 

scala> df.limit(2).groupBy("C1").agg(sum("C0")) 
    res33: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

    scala> res33.show 
    +-----+-------+ 
    | C1|sum(C0)| 
    +-----+-------+ 
    | Name|  10| 
    |Name1|  11| 
    +-----+-------+ 



    scala> df.groupBy("C1").agg(sum("C0")).limit(2) 
res2: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res2.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
+-----+-------+ 

scala> df.distinct 
res8: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res8.show 
+---+-----+----+--------+ 
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 10| Name|2016| Country| 
| 12|Name2|2017|Country2| 
+---+-----+----+--------+ 

scala> df.dropDuplicates(Array("c1")) 
res11: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res11.show 
+---+-----+----+--------+              
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 12|Name2|2017|Country2| 
| 10| Name|2016| Country| 
+---+-----+----+--------+ 
+0

Cảm ơn câu trả lời, nhưng điều này không hoàn toàn làm những gì tôi muốn. Nếu tôi muốn lên đến 1000 giá trị khác biệt từ một cột, "df.limit (1000)" sẽ đặt một giới hạn trên cứng trên số lượng các giá trị trả lại, nhưng tôi có thể mất các giá trị khác biệt mà tôi nên thêm vào khác. – user1500142

+0

bạn có hai phương thức riêng biệt và dropDuplicates mà bạn có thể thực thi trước các phương thức giới hạn, nhóm và tăng cường. Khác biệt sẽ xem xét tất cả các cột và droDuplicates cho phép bạn kiểm soát các cột để so sánh để xác định các bản sao. @ user1500142 – mark

2

Tôi đang sử dụng bản sao sửa đổi của các hàm collect_set và collect_list; vì phạm vi mã, các bản sao đã sửa đổi phải nằm trong cùng một đường dẫn gói như bản gốc. Mã được liên kết hoạt động cho Spark 2.1.0; nếu bạn đang sử dụng phiên bản trước, chữ ký của phương thức có thể khác nhau.

Ném tập tin này (https://gist.github.com/lokkju/06323e88746c85b2ce4de3ea9cdef9bc) vào dự án của bạn như src/main/org/apache/tia lửa/sql/chất xúc tác/biểu/collect_limit.scala

sử dụng nó như:

import org.apache.spark.sql.catalyst.expression.collect_limit._ 
df.groupBy('set_col).agg(collect_set_limit('set_col,1000) 
3

Giải pháp của tôi rất giống với Loki's answer with collect_set_limit.


Tôi muốn sử dụng một UDF mà có thể làm những gì bạn muốn sau collect_set (hoặc collect_list) hoặc một UDAF khó khăn hơn nhiều.

Với nhiều kinh nghiệm hơn với UDF, tôi sẽ làm điều đó trước tiên. Mặc dù UDF không được tối ưu hóa, cho trường hợp sử dụng này thì tốt.

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) } 
val sample = spark.range(50).withColumn("key", $"id" % 5) 

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false) 
+---+--------------------------------------+ 
|key|all         | 
+---+--------------------------------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]| 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]| 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]| 
+---+--------------------------------------+ 

scala> sample. 
    groupBy("key"). 
    agg(collect_set("id") as "all"). 
    withColumn("limit(3)", limitUDF($"all", lit(3))). 
    show(false) 
+---+--------------------------------------+------------+ 
|key|all         |limit(3) | 
+---+--------------------------------------+------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] | 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] | 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] | 
+---+--------------------------------------+------------+ 

Xem functions đối tượng (đối với udf tài liệu của hàm).

Các vấn đề liên quan