2017-08-21 21 views
5

Tôi có tập dữ liệu bao gồm cột dấu thời gian và cột đô la. Tôi muốn tìm số đô la trung bình mỗi tuần kết thúc tại dấu thời gian của mỗi hàng. Ban đầu tôi đã nhìn vào hàm pyspark.sql.functions.window, nhưng nó sẽ thu thập dữ liệu theo tuần.pyspark: cán trung bình sử dụng dữ liệu thời gian chờ

Dưới đây là một ví dụ:

%pyspark 
import datetime 
from pyspark.sql import functions as F 

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) 
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp')) 

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) 
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

Điều này dẫn đến hai kỷ lục:

|  start  |   end   | avg | 
|---------------------|----------------------|-----| 
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| 
|---------------------|----------------------|-----| 
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| 
|---------------------|----------------------|-----| 

Chức năng cửa sổ binned dữ liệu chuỗi thời gian chứ không phải là thực hiện một trung bình cán.

Có cách nào để thực hiện mức trung bình khi tôi sẽ lấy lại mức trung bình hàng tuần cho mỗi hàng có khoảng thời gian kết thúc tại dấu thời gianGMT của hàng không?

EDIT:

Zhang của câu trả lời dưới đây là gần với những gì tôi muốn, nhưng không chính xác những gì tôi muốn xem.

Dưới đây là một ví dụ tốt hơn để hiển thị những gì tôi đang cố gắng để có được tại địa chỉ:

%pyspark 
from pyspark.sql import functions as F 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

Điều này dẫn đến dataframe sau:

dollars timestampGMT   rolling_average 
25  2017-03-18 11:27:18.0 25 
17  2017-03-10 15:27:18.0 15 
13  2017-03-15 12:27:18.0 15 

Tôi muốn tỷ lệ trung bình là qua tuần tiến hành ngày trong cột timestampGMT, điều này sẽ dẫn đến điều này:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17 
13  2017-03-15 12:27:18.0 15 
25  2017-03-18 11:27:18.0 19 

Ở trên lts, rolling_average cho 2017-03-10 là 17, vì không có hồ sơ trước. Điểm số trung bình cho giai đoạn 2017-03-15 là 15 vì nó là trung bình 13 từ 2017-03-15 và 17 từ 2017-03-10 rơi cùng với cửa sổ 7 ngày trước đó. Trung bình luân phiên cho 2017-03-18 là 19 vì nó trung bình 25 từ 2017-03-18 và 13 từ 2017-03-10 rơi với cửa sổ 7 ngày trước đó, và nó không bao gồm 17 từ 2017 -03-10 bởi vì nó không rơi với cửa sổ 7 ngày trước đó.

Có cách nào để thực hiện điều này thay vì cửa sổ binning nơi cửa sổ hàng tuần không trùng lặp không?

Trả lời

4

tôi đã tìm ra cách chính xác để tính toán một chuyển động/trung bình cán sử dụng stackoverflow này:

Spark Window Functions - rangeBetween dates

Ý tưởng cơ bản là chuyển đổi cột dấu thời gian của bạn thành secon ds, và sau đó bạn có thể sử dụng hàm rangeBetween trong lớp pyspark.sql.Window để bao gồm các hàng chính xác trong cửa sổ của bạn.

Dưới đây là ví dụ giải quyết:

%pyspark 
from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

Điều này dẫn đến cột chính xác của cán trung bình mà tôi đang tìm kiếm:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17.0 
13  2017-03-15 12:27:18.0 15.0 
25  2017-03-18 11:27:18.0 19.0 
1

Bạn có nghĩa này:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"), 
          (13, "2017-03-11T12:27:18+00:00"), 
          (21, "2017-03-17T11:27:18+00:00")], 
          ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days")))) 

Output:

+-------+-------------------+---------------+         
|dollars|timestampGMT  |rolling_average| 
+-------+-------------------+---------------+ 
|21  |2017-03-17 19:27:18|21.0   | 
|17  |2017-03-11 23:27:18|15.0   | 
|13  |2017-03-11 20:27:18|15.0   | 
+-------+-------------------+---------------+ 
+0

Cảm ơn Zhang, đó là gần gũi hơn với những gì tôi muốn, nhưng không phải chính xác những gì tôi muốn. Mã của bạn vẫn đang tính toán câu trả lời qua ngày binning. Tôi muốn mỗi trung bình hàng tuần kết thúc vào ngày trong hàng. Đó là lỗi của tôi vì không tạo ra một ví dụ tuyệt vời. Tôi sẽ chỉnh sửa bài đăng của mình bằng ví dụ được cập nhật hiển thị những gì tôi muốn. –

Các vấn đề liên quan