2015-12-29 22 views
6

Cập nhật: Có vẻ như lỗi của tôi có thể do tôi cài đặt Spark và/hoặc Hive. Làm việc với các chức năng cửa sổ có vẻ khá đơn giản trong một sổ ghi chép Databricks (được lưu trữ). Tôi cần phải tìm ra cách để thiết lập này tại địa phương.Làm cách nào để lấy một DataFrame PySpark được tạo bằng HiveContext trong Spark 1.5.2?

Tôi có một Khung dữ liệu Spark mà tôi cần sử dụng chức năng Cửa sổ. * Tôi đã thử làm theo hướng dẫn trên here, nhưng tôi đã gặp phải một số sự cố.

Thiết lập môi trường của tôi:

import os 
import sys 
import datetime as dt 

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' 
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' 
sys.path.append('/usr/bin/spark-1.5.2/python') 
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') 

import pyspark 
sc = pyspark.SparkContext() 
hiveContext = pyspark.sql.HiveContext(sc) 
sqlContext = pyspark.sql.SQLContext(sc) 
from pyspark.sql import Row 
from pyspark.sql.functions import struct 
from pyspark.sql import DataFrame 
from collections import OrderedDict 

Thiết lập dữ liệu của tôi:

test_ts = {'adminDistrict': None, 
'city': None, 
'country': {'code': 'NA', 'name': 'UNKNOWN'}, 
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, 
    {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, 
    {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, 
    {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, 
    {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, 
    {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, 
    {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, 
    {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, 
    {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, 
    {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, 
    {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, 
    {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, 
    {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, 
    {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, 
    {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, 
    {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, 
    {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, 
    {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, 
    {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, 
    {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, 
    {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, 
    {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, 
    {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, 
    {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 
'maxDate': '2015-12-28T00:00:00Z', 
'minDate': '2005-08-25T00:00:00Z', 
'name': 'S&P GSCI Crude Oil Spot', 
'offset': 0, 
'resolution': 'DAY', 
'sources': ['trf'], 
'subtype': 'Index', 
'type': 'Commodities', 
'uid': 'TRF_INDEX_Z39824_PI'} 

Một chức năng để biến json đó vào một DataFrame:

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) 
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')]) 

Bắt một dataframe và dùng xem nội dung bên trong:

test_df = ts_to_df(test_ts) 
test_df.show() 

Điều đó cho thấy tôi đây:

+----------+----------------------+ 
|  Date|SP_GSCI_Crude_Oil_Spot| 
+----------+----------------------+ 
|2005-08-25|    369.89| 
|2005-08-26|    362.44| 
|2005-08-29|     368.3| 
|2005-08-30|     382.6| 
|2005-08-31|    377.84| 
|2005-09-01|    380.74| 
|2005-09-02|    370.33| 
|2005-09-05|    370.33| 
|2005-09-06|     361.5| 
|2005-09-07|    352.79| 
|2005-09-08|     354.3| 
|2005-09-09|     353.0| 
|2005-09-12|    349.35| 
|2005-09-13|    348.82| 
|2005-09-14|    360.24| 
|2005-09-15|    357.61| 
|2005-09-16|    347.14| 
|2005-09-19|     370.0| 
|2005-09-20|    362.82| 
|2005-09-21|    366.11| 
+----------+----------------------+ 

Và đây là nơi tôi không có ý tưởng những gì tôi đang làm và mọi thứ bắt đầu đi sai:

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

Đó mang lại cho tôi lỗi này:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

Vì vậy, có vẻ như tôi cần một HiveContext, phải không? Tôi có cần tạo DataFrame của mình bằng HiveContext không? Sau đó, hãy để tôi cố gắng tạo ra một DataFrame một cách rõ ràng bằng HiveContext:

def ts_to_hive_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
    return hiveContext.createDataFrame(temp_rdd) 

test_df = ts_to_hive_df(test_ts) 
test_df.show() 

Nhưng điều đó mang lại cho tôi lỗi này:

TypeError: 'JavaPackage' object is not callable

Vì vậy, làm thế nào để sử dụng chức năng Window? Tôi có cần tạo DataFrames bằng HiveContext không? Nếu vậy, thì làm thế nào để tôi làm điều đó? Ai đó có thể cho tôi biết tôi đang làm gì sai?

* Tôi cần biết nếu có những khoảng trống trong dữ liệu của mình. Tôi có cột 'Ngày' và cho mỗi hàng, được sắp xếp theo Ngày, tôi muốn biết điều gì ở hàng kế tiếp, và nếu tôi thiếu ngày hoặc dữ liệu xấu, thì tôi muốn sử dụng dữ liệu của ngày cuối cùng trên hàng đó. Nếu bạn biết cách làm tốt hơn, hãy cho tôi biết. Nhưng tôi vẫn muốn biết làm thế nào để có được các chức năng Window làm việc.

+0

Xin lỗi. Đã thêm mã cụ thể. Tôi hy vọng rằng sẽ dẫn chúng ta đến đâu đó. Cảm ơn đã dành một cái nhìn. – Nathaniel

+1

Được rồi, có vẻ như có điều gì đó có thể sai lầm với cách tôi cài đặt Spark (hoặc Hive?) Cục bộ, vì tôi có thể làm việc này trong một sổ ghi chép DataBricks. DataBricks không muốn chúng tôi tạo HiveContexts hoặc SQLContexts của riêng mình. Để làm cho nó hoạt động ở đó, tôi đã bỏ qua việc tạo ra các bối cảnh của riêng mình và tôi đã sử dụng hàm ts_to_hive_df ở trên, thay thế hiveContext của tôi bằng sqlContext của chúng. Tôi sẽ phải làm việc này sau khi cài đặt. Tôi sẽ quay lại và viết một giải pháp khi tôi tìm ra nó. – Nathaniel

+1

Dường như bạn là các tệp nhị phân Spark đã được tạo mà không cần hỗ trợ Hive. – zero323

Trả lời

0

Đây là câu hỏi cũ và do đó bạn có thể chuyển sang phiên bản Spark mới. Tôi đang chạy spark 2.0 bản thân mình, vì vậy đây có thể là gian lận.

Nhưng fwiw: 2 vấn đề có thể xảy ra. Trong ví dụ đầu tiên, tôi nghĩ rằng các .toDF() có thể mặc định để SQLContext kể từ khi bạn đã có cả hai được gọi là. Trong lần thứ hai, khi bạn tái cấu trúc, có thể nào bạn đang gọi hàm hivecontext bên trong hàm?

Nếu tôi refactor hàm thứ hai ts_to_df của bạn để có hivecontext được gọi bên ngoài hàm, mọi thứ đều ổn.

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    return data 

data = ts_to_df(test_ts) 
test_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
test_df = hiveContext.createDataFrame(test_rdd) 

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

tôi nhận được đầu ra

+----------+ 
| Next_Date| 
+----------+ 
|2005-08-26| 
|2005-08-29| 
|2005-08-30| 
|2005-08-31| 
|2005-09-01| 
|2005-09-02| 
..... 
Các vấn đề liên quan