2016-12-07 37 views
11

Tôi có một khung dữ liệu có một hàng và một vài cột. Một số cột là các giá trị đơn và một số khác là danh sách. Tất cả các cột trong danh sách đều có cùng độ dài. Tôi muốn chia từng cột danh sách thành một hàng riêng biệt, trong khi vẫn giữ bất kỳ cột nào không phải danh sách.Pyspark: Chia nhiều cột mảng thành hàng

mẫu DF:

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) 
# +---+---------+---------+---+ 
# | a|  b|  c| d| 
# +---+---------+---------+---+ 
# | 1|[1, 2, 3]|[7, 8, 9]|foo| 
# +---+---------+---------+---+ 

Những gì tôi muốn:

+---+---+----+------+ 
| a| b| c | d | 
+---+---+----+------+ 
| 1| 1| 7 | foo | 
| 1| 2| 8 | foo | 
| 1| 3| 9 | foo | 
+---+---+----+------+ 

Nếu tôi chỉ có một cột danh sách, điều này sẽ được dễ dàng bởi chỉ cần làm một explode:

df_exploded = df.withColumn('b', explode('b')) 
# >>> df_exploded.show() 
# +---+---+---------+---+ 
# | a| b|  c| d| 
# +---+---+---------+---+ 
# | 1| 1|[7, 8, 9]|foo| 
# | 1| 2|[7, 8, 9]|foo| 
# | 1| 3|[7, 8, 9]|foo| 
# +---+---+---------+---+ 

Tuy nhiên, nếu tôi cố gắng cũng explode cột c, tôi kết thúc bằng một thẻ dữ liệu Rame với chiều dài bình phương của những gì tôi muốn:

df_exploded_again = df_exploded.withColumn('c', explode('c')) 
# >>> df_exploded_again.show() 
# +---+---+---+---+ 
# | a| b| c| d| 
# +---+---+---+---+ 
# | 1| 1| 7|foo| 
# | 1| 1| 8|foo| 
# | 1| 1| 9|foo| 
# | 1| 2| 7|foo| 
# | 1| 2| 8|foo| 
# | 1| 2| 9|foo| 
# | 1| 3| 7|foo| 
# | 1| 3| 8|foo| 
# | 1| 3| 9|foo| 
# +---+---+---+---+ 

Những gì tôi muốn là - cho mỗi cột, lấy yếu tố thứ n của mảng trong cột đó và thêm rằng để một hàng mới. Tôi đã cố gắng lập bản đồ một nổ accross tất cả các cột trong dataframe, nhưng điều đó dường như không làm việc, hoặc:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF() 

Trả lời

13

Với DataFrames và UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType 
from pyspark.sql.functions import col, udf 

zip_ = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     # Adjust types to reflect data types 
     StructField("first", IntegerType()), 
     StructField("second", IntegerType()) 
    ])) 
) 

(df 
    .withColumn("tmp", zip_("b", "c")) 
    # UDF output cannot be directly passed to explode 
    .withColumn("tmp", explode("tmp")) 
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d")) 

Với RDDs:

(df 
    .rdd 
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) 
    .toDF(["a", "b", "c", "d"])) 

Cả hai giải pháp đều không hiệu quả do phí giao tiếp của Python. Nếu kích thước dữ liệu là cố định bạn có thể làm một cái gì đó như thế này:

from functools import reduce 
from pyspark.sql import DataFrame 

# Length of array 
n = 3 

# For legacy Python you'll need a separate function 
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") 
     for i in range(n)) 
).toDF("a", "b", "c", "d") 

hoặc thậm chí:

from pyspark.sql.functions import array, struct 

# SQL level zip of arrays of known size 
# followed by explode 
tmp = explode(array(*[ 
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) 
    for i in range(n) 
])) 

(df 
    .withColumn("tmp", tmp) 
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d")) 

này nên được nhanh hơn đáng kể so với UDF hoặc RDD. Khái quát hóa để hỗ trợ một số tùy ý các cột:

# This uses keyword only arguments 
# If you use legacy Python you'll have to change signature 
# Body of the function can stay the same 
def zip_and_explode(*colnames, n): 
    return explode(array(*[ 
     struct(*[col(c).getItem(i).alias(c) for c in colnames]) 
     for i in range(n) 
    ])) 

df.withColumn("tmp", zip_and_explode("b", "c", n=3)) 
4

Bạn sẽ cần phải sử dụng flatMap, không map như bạn muốn chắc nhiều hàng sản xuất ra của mỗi hàng đầu vào.

from pyspark.sql import Row 
def dualExplode(r): 
    rowDict = r.asDict() 
    bList = rowDict.pop('b') 
    cList = rowDict.pop('c') 
    for b,c in zip(bList, cList): 
     newDict = dict(rowDict) 
     newDict['b'] = b 
     newDict['c'] = c 
     yield Row(**newDict) 

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode)) 
Các vấn đề liên quan