2016-05-27 54 views
7

Tôi có một dataframe đơn giản như thế này:Pivot Chuỗi cột trên Pyspark Dataframe

rdd = sc.parallelize(
    [ 
     (0, "A", 223,"201603", "PORT"), 
     (0, "A", 22,"201602", "PORT"), 
     (0, "A", 422,"201601", "DOCK"), 
     (1,"B", 3213,"201602", "DOCK"), 
     (1,"B", 3213,"201601", "PORT"), 
     (2,"C", 2321,"201601", "DOCK") 
    ] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

df_data.show() 
+---+----+----+------+----+ 
| id|type|cost| date|ship| 
+---+----+----+------+----+ 
| 0| A| 223|201603|PORT| 
| 0| A| 22|201602|PORT| 
| 0| A| 422|201601|DOCK| 
| 1| B|3213|201602|DOCK| 
| 1| B|3213|201601|PORT| 
| 2| C|2321|201601|DOCK| 
+---+----+----+------+----+ 

và tôi cần phải xoay nó theo ngày:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show() 

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|2321.0| null| null| 
| 0| A| 422.0| 22.0| 223.0| 
| 1| B|3213.0|3213.0| null| 
+---+----+------+------+------+ 

Tất cả mọi thứ hoạt động như mong đợi. Nhưng bây giờ tôi cần phải xoay nó và có được một cột không phải số:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show() 

và tất nhiên tôi sẽ nhận được một hợp ngoại lệ:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;' 

Tôi muốn tạo ra một cái gì đó trên dòng

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|DOCK | null| null| 
| 0| A| DOCK | PORT| DOCK| 
| 1| B|DOCK |PORT | null| 
+---+----+------+------+------+ 

Có thể với pivot không?

Trả lời

10

Giả sử rằng (id |type | date) kết hợp là duy nhất và mục tiêu duy nhất của bạn là xoay vòng và không kết hợp, bạn có thể sử dụng first (hoặc bất kỳ chức năng khác không bị giới hạn giá trị số):

from pyspark.sql.functions import first 

(df_data 
    .groupby(df_data.id, df_data.type) 
    .pivot("date") 
    .agg(first("ship")) 
    .show()) 

## +---+----+------+------+------+ 
## | id|type|201601|201602|201603| 
## +---+----+------+------+------+ 
## | 2| C| DOCK| null| null| 
## | 0| A| DOCK| PORT| PORT| 
## | 1| B| PORT| DOCK| null| 
## +---+----+------+------+------+ 

Nếu các giả định này là không đúng bạn' sẽ phải tổng hợp trước dữ liệu của bạn. Ví dụ: giá trị phổ biến nhất ship:

from pyspark.sql.functions import max, struct 

(df_data 
    .groupby("id", "type", "date", "ship") 
    .count() 
    .groupby("id", "type") 
    .pivot("date") 
    .agg(max(struct("count", "ship"))) 
    .show()) 

## +---+----+--------+--------+--------+ 
## | id|type| 201601| 201602| 201603| 
## +---+----+--------+--------+--------+ 
## | 2| C|[1,DOCK]| null| null| 
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]| 
## | 1| B|[1,PORT]|[1,DOCK]| null| 
## +---+----+--------+--------+--------+ 
Các vấn đề liên quan