2015-05-01 25 views
25

Tôi đang phân tích một số dữ liệu với dataframes pyspark, giả sử tôi có một dataframe df mà tôi đang tập hợp:cột đổi tên cho dataframes pyspark tập hợp

df.groupBy("group")\ 
    .agg({"money":"sum"})\ 
    .show(100) 

này sẽ cho tôi:

group    SUM(money#2L) 
A     137461285853 
B     172185566943 
C     271179590646 

Việc tổng hợp hoạt động tốt nhưng tôi không thích tên cột mới "SUM (money # 2L)". Có cách nào gọn gàng để đổi tên cột này thành nội dung nào đó có thể đọc được từ phương pháp .agg không? Có lẽ một cái gì đó tương tự như những gì người ta sẽ làm gì trong dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money)) 

Trả lời

43

Mặc dù tôi vẫn thích dplyr cú pháp, đoạn mã này sẽ làm:

import pyspark.sql.functions as sf 

df.groupBy("group")\ 
    .agg(sf.sum('money').alias('money'))\ 
    .show(100) 

Nó được tiết.

25

withColumnRenamed nên làm các trick. Đây là liên kết đến pyspark.sql API.

df.groupBy("group")\ 
    .agg({"money":"sum"})\ 
    .withColumnRenamed("SUM(money)", "money") 
    .show(100) 
3

Tôi đã thực hiện một chức năng trợ giúp nhỏ cho việc này có thể giúp một số người.

import re 

from functools import partial 

def rename_cols(agg_df, ignore_first_n=1): 
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe 
    and the number of aggregation columns to ignore. 
    """ 
    delimiters = "(", ")" 
    split_pattern = '|'.join(map(re.escape, delimiters)) 
    splitter = partial(re.split, split_pattern) 
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n] 
    renamed = map(split_agg, agg_df.columns[ignore_first_n:]) 
    renamed = zip(agg_df.columns[ignore_first_n:], renamed) 
    for old, new in renamed: 
     agg_df = agg_df.withColumnRenamed(old, new) 
    return agg_df 

Một ví dụ:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks") 
.groupby("id") 
.agg({"rank": "mean", 
     "*": "count", 
     "rate": "mean", 
     "price": "mean", 
     "clicks": "mean", 
     }) 
) 

>>> gb.columns 
['id', 
'avg(rate)', 
'count(1)', 
'avg(price)', 
'avg(rank)', 
'avg(clicks)'] 

>>> rename_cols(gb).columns 
['id', 
'avg_rate', 
'count_1', 
'avg_price', 
'avg_rank', 
'avg_clicks'] 

Làm ít nhất một chút để cứu người từ gõ rất nhiều.

+0

Rất hữu ích và kịp thời. Tôi vừa định hỏi cùng một câu hỏi. Nó sẽ là tốt đẹp nếu bạn có thể chỉ định một tên cột mới trong 'agg' dict (trong Spark tôi có nghĩa là). –

+0

@EvanZamir cảm ơn! Tôi có thể thử và làm một PR đơn giản để châm ngòi cho điều đó. –

Các vấn đề liên quan