Giải pháp của tôi rất giống với Loki's answer with collect_set_limit
.
Tôi muốn sử dụng một UDF mà có thể làm những gì bạn muốn sau collect_set
(hoặc collect_list
) hoặc một UDAF khó khăn hơn nhiều.
Với nhiều kinh nghiệm hơn với UDF, tôi sẽ làm điều đó trước tiên. Mặc dù UDF không được tối ưu hóa, cho trường hợp sử dụng này thì tốt.
val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)
scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all |
+---+--------------------------------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+
scala> sample.
groupBy("key").
agg(collect_set("id") as "all").
withColumn("limit(3)", limitUDF($"all", lit(3))).
show(false)
+---+--------------------------------------+------------+
|key|all |limit(3) |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
Xem functions đối tượng (đối với udf
tài liệu của hàm).
Cảm ơn câu trả lời. Tuy nhiên, 1) Tôi chỉ muốn danh sách các giá trị _distinct_. Tôi thấy có một rdd.distinct(), nhưng điều đó dường như không có một tham số giới hạn 2) Không chắc chắn làm thế nào để sử dụng một chức năng lọc trong thu thập. Làm cách nào để sử dụng bộ lọc để chỉ nhận được một số lượng giá trị nhất định? – user1500142
Ngoài ra, lý tưởng tôi muốn tránh sử dụng các rdd. Tôi hiện đang giống như df.groupBy(). Agg (
user1500142