2017-03-06 17 views
7

Với DataFrame sau:Cách tối đa hóa giá trị và giữ tất cả các cột (cho các bản ghi tối đa cho mỗi nhóm)?

+----+-----+---+-----+ 
| uid| k| v|count| 
+----+-----+---+-----+ 
| a|pref1| b| 168| 
| a|pref3| h| 168| 
| a|pref3| t| 63| 
| a|pref3| k| 84| 
| a|pref1| e| 84| 
| a|pref2| z| 105| 
+----+-----+---+-----+ 

Làm thế nào tôi có thể nhận được giá trị tối đa từ uid, k nhưng bao gồm v?

+----+-----+---+----------+ 
| uid| k| v|max(count)| 
+----+-----+---+----------+ 
| a|pref1| b|  168| 
| a|pref3| h|  168| 
| a|pref2| z|  105| 
+----+-----+---+----------+ 

tôi có thể làm một cái gì đó như thế này nhưng nó sẽ thả các cột "v":

df.groupBy("uid", "k").max("count") 

Trả lời

6

Đó là hoàn hảo ví dụ cho các nhà khai thác cửa sổ (sử dụng chức năng over) hoặc join.

Vì bạn đã biết cách sử dụng cửa sổ, tôi chỉ tập trung vào join.

scala> val inventory = Seq(
    | ("a", "pref1", "b", 168), 
    | ("a", "pref3", "h", 168), 
    | ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count") 
inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields] 

scala> val maxCount = inventory.groupBy("uid", "k").max("count") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+----------+ 
|uid| k|max(count)| 
+---+-----+----------+ 
| a|pref3|  168| 
| a|pref1|  168| 
+---+-----+----------+ 

scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+---+ 
|uid| k|max| 
+---+-----+---+ 
| a|pref3|168| 
| a|pref1|168| 
+---+-----+---+ 

scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show 
+---+-----+---+---+-----+ 
|uid| k|max| v|count| 
+---+-----+---+---+-----+ 
| a|pref3|168| h| 168| 
| a|pref1|168| b| 168| 
+---+-----+---+---+-----+ 
4

Bạn có thể sử dụng chức năng cửa sổ:

from pyspark.sql.functions import max as max_ 
from pyspark.sql.window import Window 

w = Window.partitionBy("uid", "k") 

df.withColumn("max_count", max_("count").over(w)) 
+0

gần như, nó thêm cột có giá trị tối đa nhưng nó giữ tất cả các hàng. – jfgosselin

2

Dưới đây là giải pháp tốt nhất mà tôi đã đưa ra cho đến thời điểm này:

val w = Window.partitionBy("uid","k").orderBy(col("count").desc) 

df.withColumn("rank", dense_rank().over(w)).select("uid", "k","v","count").where("rank == 1").show 
Các vấn đề liên quan