2017-01-16 25 views
7

Có tương đương với chức năng Pandas Melt trong Apache Spark trong PySpark hoặc ít nhất là ở Scala không?Làm thế nào để làm tan chảy Spark DataFrame?

Tôi đã chạy tập dữ liệu mẫu cho đến bây giờ trong python và bây giờ tôi muốn sử dụng Spark cho toàn bộ tập dữ liệu.

Xin cảm ơn trước.

+0

Kiểm tra điều này: http://chappers.github.io/web%20micro%20log/2016/03/07/implementing-simple-melt-function-for-pyspark/ – MYGz

+0

Xin lỗi vì phản hồi bị chậm ... Lỗi lỗi công việc ngay cả đối với một tập dữ liệu mẫu nhỏ (rdd) được tạo với rdd = sc.parallelize ([("x", 1,4), ("y", 3,5), ("z", 2 , 6)]) –

Trả lời

12

Không có chức năng tích hợp (nếu bạn làm việc với hỗ trợ SQL và Hive, bạn có thể sử dụng stack function, nhưng không được hiển thị trong Spark và không có triển khai gốc) nhưng không đáng kể. nhập khẩu yêu cầu: thực hiện

from pyspark.sql.functions import array, col, explode, lit, struct 
from pyspark.sql import DataFrame 
from typing import Iterable 

Ví dụ:

def melt(
     df: DataFrame, 
     id_vars: Iterable[str], value_vars: Iterable[str], 
     var_name: str="variable", value_name: str="value") -> DataFrame: 
    """Convert :class:`DataFrame` from wide to long format.""" 

    # Create array<struct<variable: str, value: ...>> 
    _vars_and_vals = array(*(
     struct(lit(c).alias(var_name), col(c).alias(value_name)) 
     for c in value_vars)) 

    # Add to the DataFrame and explode 
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

    cols = id_vars + [ 
      col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] 
    return _tmp.select(*cols) 

Và một số xét nghiệm (dựa trên Pandas doctests):

import pandas as pd 

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 
        'B': {0: 1, 1: 3, 2: 5}, 
        'C': {0: 2, 1: 4, 2: 6}}) 

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
A variable value 
0 a  B  1 
1 b  B  3 
2 c  B  5 
3 a  C  2 
4 b  C  4 
5 c  C  6 
sdf = spark.createDataFrame(pdf) 
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
+---+--------+-----+ 
| A|variable|value| 
+---+--------+-----+ 
| a|  B| 1| 
| a|  C| 2| 
| b|  B| 3| 
| b|  C| 4| 
| c|  B| 5| 
| c|  C| 6| 
+---+--------+-----+ 

Lưu ý: Để sử dụng với các phiên bản Python cũ, hãy xóa chú thích loại.

3

Đến với câu hỏi này trong tìm kiếm của tôi để thực hiện tan chảy trong Spark cho scala. Đăng cổng Scala của tôi trong trường hợp ai đó cũng tình cờ gặp điều này.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame} 
/** Extends the [[org.apache.spark.sql.DataFrame]] class 
* 
* @param df the data frame to melt 
*/ 
implicit class DataFrameFunctions(df: DataFrame) { 

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. 
    * 
    * melt is (kind of) the inverse of pivot 
    * melt is currently (02/2017) not implemented in spark 
    * 
    * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) 
    * @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark 
    * 
    * @todo method overloading for simple calling 
    * 
    * @param id_vars the columns to preserve 
    * @param value_vars the columns to melt 
    * @param var_name the name for the column holding the melted columns names 
    * @param value_name the name for the column holding the values of the melted columns 
    * 
    */ 

    def melt(
      id_vars: Seq[String], value_vars: Seq[String], 
      var_name: String = "variable", value_name: String = "value") : DataFrame = { 

     // Create array<struct<variable: str, value: ...>> 
     val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) 

     // Add to the DataFrame and explode 
     val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

     val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} 

     return _tmp.select(cols: _*) 

    } 
} 

Vì tôi không phải là người tiên tiến xem xét scala, tôi chắc chắn có chỗ để cải thiện. Mọi bình luận đều được chào đón.

Các vấn đề liên quan