2016-10-18 24 views
5

Tôi có một dataframe như thế này:nổ mảng của array- (Dataframe) pySpark

+-----+--------------------+ 
|index|    merged| 
+-----+--------------------+ 
| 0|[[2.5, 2.4], [3.5...| 
| 1|[[-1.0, -1.0], [-...| 
| 2|[[-1.0, -1.0], [-...| 
| 3|[[0.0, 0.0], [0.5...| 
| 4|[[0.5, 0.5], [1.0...| 
| 5|[[0.5, 0.5], [1.0...| 
| 6|[[-1.0, -1.0], [0...| 
| 7|[[0.0, 0.0], [0.5...| 
| 8|[[0.5, 0.5], [1.0...| 
+-----+--------------------+ 

Và tôi muốn nổ tung cột sáp nhập vào

+-----+-------+-------+ 
|index|Column1|Column2| 
+-----+-------+-------+ 
| 0| 2.5| 2.4 | 
| 1| 3.5| 0.5| 
| 2| -1.0| -1.0| 
| 3| -1.0| -1.0| 
| 4| 0.0 | 0.0 | 
| 5| 0.5| 0.74| 
+-----+-------+-------+ 

Mỗi tuple [[2.5, 2.4] , [3,5,0,5]] repensente hai cột, biết rằng 2,5 và 3,5 sẽ được lưu trữ trong cột 1 và (2,4,0,5) sẽ được lưu trữ trong cột thứ hai

Vì vậy, tôi đã cố gắng này

df= df.withColumn("merged", df["merged"].cast("array<array<float>>")) 
df= df.withColumn("merged",explode('merged')) 

sau đó tôi sẽ áp dụng một udf để tạo một DF

nhưng tôi không thể đúc các dữ liệu hoặc áp dụng phát nổ, và tôi nhận được lỗi

pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true) 

Tôi cũng đã cố gắng

df= df.withColumn("merged", df["merged"].cast("array<string>")) 

nhưng không có gì hoạt động và nếu tôi áp dụng phát nổ mà không truyền, tôi nhận được

pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType; 
+0

bạn có thể cung cấp cho các giản đồ của df? có vẻ như sáp nhập thực sự là một chuỗi, không phải là những gì bạn có trong đối số. Bạn có thể sử dụng 'split' để tách một chuỗi bằng dấu tách. Ngoài ra, nó có vẻ như có lỗi chính tả trong câu hỏi của bạn: không phải là chỉ mục giống nhau cho các giá trị phát nổ trong ví dụ của bạn của kết quả mong đợi? Hoặc là những gì bạn đã cho những gì bạn thực sự muốn? – Wilmerton

+0

Thx, tôi đọc mã của tôi, và tôi thấy rằng tôi quên thêm kiểu trả về ArrayType (ArrayType (FloatType())) trong hàm lambda của tôi (những người hợp nhất các cột của tôi) – MrGildarts

+0

do đó ... đã giải quyết được vấn đề? – Wilmerton

Trả lời

0

Bạn có thể thử đoạn mã sau:

from pyspark import SparkConf, SparkContext       
from pyspark.sql import SparkSession        

from pyspark.sql.types import FloatType, StringType, IntegerType 
from pyspark.sql.functions import udf, col       


def col1_calc(merged):            
    return merged[0][0]            

def col2_calc(merged):            
    return merged[0][1]            

if __name__ == '__main__':           
    spark = SparkSession \           
     .builder \             
     .appName("Python Spark SQL Hive integration example") \  
     .getOrCreate()            

    df = spark.createDataFrame([         
     (0, [[2.5,2.4],[3.5]]),          
     (1, [[-1.0,-1.0],[3.5]]),         
     (2, [[-1.0,-1.0],[3.5]]),         
    ], ["index", "merged"])           

    df.show()              

    column1_calc = udf(col1_calc, FloatType())      
    df = df.withColumn('Column1', column1_calc(df['merged']))  
    column2_calc = udf(col2_calc, FloatType())      
    df = df.withColumn('Column2', column2_calc(df['merged']))  

    df = df.select(['Column1', 'Column2', 'index'])     
    df.show()   

Output:

+-------+-------+-----+ 
|Column1|Column2|index| 
+-------+-------+-----+ 
| 2.5| 2.4| 0| 
| -1.0| -1.0| 1| 
| -1.0| -1.0| 2| 
+-------+-------+-----+ 
Các vấn đề liên quan