2016-10-12 19 views
9

tôi có mã python này chạy cục bộ trong một dataframe gấu trúc:Áp dụng UDFs trên GroupedData trong PySpark (với chức năng ví dụ python)

df_result = pd.DataFrame(df 
          .groupby('A') 
          .apply(lambda x: myFunction(zip(x.B, x.C), x.name)) 

Tôi muốn chạy này trong PySpark, nhưng gặp khó khăn trong đối phó với pyspark .sql.group.GroupedData đối tượng.

Tôi đã thử những điều sau đây:

sparkDF 
.groupby('A') 
.agg(myFunction(zip('B', 'C'), 'A')) 

trả về

KeyError: 'A' 

Tôi đoán vì 'A' không còn là một cột và tôi không thể tìm ra tương đương cho x.name .

Và sau đó

sparkDF 
.groupby('A') 
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
.toDF() 

nhưng nhận được lỗi sau:

AttributeError: 'GroupedData' object has no attribute 'map' 

Bất kỳ đề xuất sẽ được thực sự đánh giá cao!

+0

'myFunction' là gì và có gì trong' sparkDF'? Vui lòng làm cho mã của bạn có thể tái sản xuất bằng cách chia sẻ tập dữ liệu mẫu, đầu ra dự kiến ​​và mã cụ thể. Cho đến lúc đó, câu hỏi của bạn vẫn còn quá rộng. – mtoto

Trả lời

20

Điều bạn đang cố gắng viết UDAF (Chức năng tổng hợp do người dùng xác định) trái với UDF (Chức năng do người dùng xác định). UDAF là các hàm hoạt động trên dữ liệu được nhóm theo một khóa. Cụ thể là họ cần phải xác định cách hợp nhất nhiều giá trị trong nhóm trong một phân vùng duy nhất, và sau đó làm thế nào để hợp nhất các kết quả trên các phân vùng cho khóa. Hiện tại không có cách nào trong python để thực hiện một UDAF, chúng chỉ có thể được thực hiện trong Scala.

Nhưng, bạn có thể giải quyết nó bằng Python. Bạn có thể sử dụng tập hợp thu thập để thu thập các giá trị được nhóm của bạn và sau đó sử dụng một UDF thông thường để thực hiện những gì bạn muốn với chúng. Thông báo trước chỉ là collect_set chỉ hoạt động trên các giá trị nguyên thủy, vì vậy bạn sẽ cần phải mã hóa chúng xuống một chuỗi.

from pyspark.sql.types import StringType 
from pyspark.sql.functions import col, collect_list, concat_ws, udf 

def myFunc(data_list): 
    for val in data_list: 
     b, c = data.split(',') 
     # do something 

    return <whatever> 

myUdf = udf(myFunc, StringType()) 

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \ 
    .groupBy('A').agg(collect_list('data').alias('data')) 
    .withColumn('data', myUdf('data')) 

Sử dụng collect_set nếu bạn muốn deduping. Ngoài ra, nếu bạn có nhiều giá trị cho một số khóa của mình, điều này sẽ chậm vì tất cả các giá trị cho một khóa sẽ cần được thu thập trong một phân vùng duy nhất ở đâu đó trên cụm của bạn. Nếu kết quả cuối cùng của bạn là giá trị bạn tạo bằng cách kết hợp các giá trị cho mỗi khóa theo cách nào đó (ví dụ: tổng hợp chúng), có thể nhanh hơn để thực hiện nó bằng phương pháp RDD aggregateByKey cho phép bạn tạo giá trị trung gian cho mỗi khóa trong phân vùng trước khi xáo trộn dữ liệu xung quanh.

5

Vì Spark 2.3 (hiện đang phát triển), bạn có thể sử dụng pandas_udf. Nhóm biến thể tổng hợp có một chức năng mà bản đồ từ Pandas DataFrame của cùng một hình dạng như đầu vào, đầu ra DataFrame.Ví dụ, nếu dữ liệu trông như thế này:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)], 
    ("key", "value1", "value2")) 
) 

và bạn muốn để tính toán giá trị trung bình của cặp phút giữa value1value2, bạn phải xác định schema đầu ra:

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("key", StringType()), 
    StructField("avg_min", DoubleType()) 
]) 

pandas_udf:

from pyspark.sql.functions import pandas_udf 
from pyspark.sql.functions import PandasUDFType 

@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) 
def g(df): 
    result = pd.DataFrame(df.groupby(df.key).apply(
     lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean() 
    )) 
    result.reset_index(inplace=True, drop=False) 
    return result 

và áp dụng:

df.groupby("key").apply(g).show() 
+---+-------+ 
|key|avg_min| 
+---+-------+ 
| b| -1.5| 
| a| -0.5| 
+---+-------+ 

Không bao gồm định nghĩa lược đồ và trang trí, mã Pandas hiện tại của bạn có thể được áp dụng như hiện trạng.

Các vấn đề liên quan