2016-01-25 17 views
5

Tôi đang làm việc với dữ liệu được trích xuất từ ​​SFDC sử dụng gói bán hàng đơn giản. Tôi đang sử dụng Python3 cho kịch bản và Spark 1.5.2.Tạo DataFrame từ danh sách các bộ dữ liệu bằng cách sử dụng pyspark

Tôi tạo ra một RDD chứa các dữ liệu sau:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')] 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')] 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
... 

Những thông tin này là trong RDD gọi v_rdd

schema của tôi trông như thế này:

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true))) 

tôi đang cố gắng để tạo ra DataFrame trong số RDD này:

sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema) 

Tôi in DataFrame tôi:

sqlDataFrame.printSchema() 

Và nhận được như sau:

+--------------------+--------------------+--------------------+ 
|     Id| PackSize|       Name| 
+--------------------+--------------------+--------------------+ 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 

Tôi đang mong đợi để xem dữ liệu thực tế, như thế này:

+------------------+------------------+--------------------+ 
|    Id|PackSize|       Name| 
+------------------+------------------+--------------------+ 
|a0w1a0000003xB1A |    1.0|  A   | 
|a0w1a0000003xAAI |    1.0|  B   | 
|a0w1a00000xB3AAI |    30.0|  C   | 

bạn có thể vui lòng giúp tôi xác định những gì tôi đang làm sai ở đây.

Tập lệnh Python của tôi dài, tôi không chắc chắn sẽ thuận tiện cho mọi người sàng lọc thông qua nó, vì vậy tôi chỉ đăng các phần tôi đang gặp sự cố.

Cảm ơn một tấn trước!

Trả lời

12

Xin chào, lần sau bạn có thể cung cấp ví dụ làm việc hay không. Điều đó sẽ dễ dàng hơn.

Cách trình bày RDD của bạn về cơ bản là lạ khi tạo DataFrame. Đây là cách bạn tạo DF theo Tài liệu Spark.

>>> l = [('Alice', 1)] 
>>> sqlContext.createDataFrame(l).collect() 
[Row(_1=u'Alice', _2=1)] 
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect() 
[Row(name=u'Alice', age=1)] 

Vì vậy, liên quan đến ví dụ của bạn, bạn có thể tạo ra kết quả mong muốn của bạn như thế này:

# Your data at the moment 
data = sc.parallelize([ 
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')], 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')], 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
    ]) 
# Convert to tuple 
data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1])) 

# Define schema 
schema = StructType([ 
    StructField("Id", StringType(), True), 
    StructField("Packsize", StringType(), True), 
    StructField("Name", StringType(), True) 
]) 

# Create dataframe 
DF = sqlContext.createDataFrame(data_converted, schema) 

# Output 
DF.show() 
+----------------+--------+----+ 
|    Id|Packsize|Name| 
+----------------+--------+----+ 
|a0w1a0000003xB1A|  1.0| A| 
|a0w1a0000003xAAI|  1.0| B| 
|a0w1a00000xB3AAI| 30.0| C| 
+----------------+--------+----+ 

Hope this helps

Các vấn đề liên quan