Cập nhật: Có vẻ như lỗi của tôi có thể do tôi cài đặt Spark và/hoặc Hive. Làm việc với các chức năng cửa sổ có vẻ khá đơn giản trong một sổ ghi chép Databricks (được lưu trữ). Tôi cần phải tìm ra cách để thiết lập này tại địa phương.Làm cách nào để lấy một DataFrame PySpark được tạo bằng HiveContext trong Spark 1.5.2?
Tôi có một Khung dữ liệu Spark mà tôi cần sử dụng chức năng Cửa sổ. * Tôi đã thử làm theo hướng dẫn trên here, nhưng tôi đã gặp phải một số sự cố.
Thiết lập môi trường của tôi:
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
Thiết lập dữ liệu của tôi:
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
Một chức năng để biến json đó vào một DataFrame:
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
Bắt một dataframe và dùng xem nội dung bên trong:
test_df = ts_to_df(test_ts)
test_df.show()
Điều đó cho thấy tôi đây:
+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
Và đây là nơi tôi không có ý tưởng những gì tôi đang làm và mọi thứ bắt đầu đi sai:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
Đó mang lại cho tôi lỗi này:
Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;
Vì vậy, có vẻ như tôi cần một HiveContext, phải không? Tôi có cần tạo DataFrame của mình bằng HiveContext không? Sau đó, hãy để tôi cố gắng tạo ra một DataFrame một cách rõ ràng bằng HiveContext:
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
Nhưng điều đó mang lại cho tôi lỗi này:
TypeError: 'JavaPackage' object is not callable
Vì vậy, làm thế nào để sử dụng chức năng Window? Tôi có cần tạo DataFrames bằng HiveContext không? Nếu vậy, thì làm thế nào để tôi làm điều đó? Ai đó có thể cho tôi biết tôi đang làm gì sai?
* Tôi cần biết nếu có những khoảng trống trong dữ liệu của mình. Tôi có cột 'Ngày' và cho mỗi hàng, được sắp xếp theo Ngày, tôi muốn biết điều gì ở hàng kế tiếp, và nếu tôi thiếu ngày hoặc dữ liệu xấu, thì tôi muốn sử dụng dữ liệu của ngày cuối cùng trên hàng đó. Nếu bạn biết cách làm tốt hơn, hãy cho tôi biết. Nhưng tôi vẫn muốn biết làm thế nào để có được các chức năng Window làm việc.
Xin lỗi. Đã thêm mã cụ thể. Tôi hy vọng rằng sẽ dẫn chúng ta đến đâu đó. Cảm ơn đã dành một cái nhìn. – Nathaniel
Được rồi, có vẻ như có điều gì đó có thể sai lầm với cách tôi cài đặt Spark (hoặc Hive?) Cục bộ, vì tôi có thể làm việc này trong một sổ ghi chép DataBricks. DataBricks không muốn chúng tôi tạo HiveContexts hoặc SQLContexts của riêng mình. Để làm cho nó hoạt động ở đó, tôi đã bỏ qua việc tạo ra các bối cảnh của riêng mình và tôi đã sử dụng hàm ts_to_hive_df ở trên, thay thế hiveContext của tôi bằng sqlContext của chúng. Tôi sẽ phải làm việc này sau khi cài đặt. Tôi sẽ quay lại và viết một giải pháp khi tôi tìm ra nó. – Nathaniel
Dường như bạn là các tệp nhị phân Spark đã được tạo mà không cần hỗ trợ Hive. – zero323