Để có thể sử dụng chức năng cửa sổ, trước tiên bạn phải tạo cửa sổ. Định nghĩa khá giống với SQL bình thường, nghĩa là bạn có thể định nghĩa thứ tự, phân vùng hoặc cả hai. Đầu tiên cho phép tạo ra một số dữ liệu giả:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
Hãy chắc chắn rằng bạn đang sử dụng HiveContext
(Spark < 2,0 chỉ):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
Tạo một cửa sổ:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
tương đương với
(PARTITION BY k ORDER BY v)
trong SQL.
Như một quy tắc của định nghĩa cửa sổ ngón tay cái nên luôn luôn chứa mệnh đề PARTITION BY
nếu không thì Spark sẽ chuyển tất cả dữ liệu sang một phân vùng duy nhất. ORDER BY
là bắt buộc đối với một số chức năng, trong khi trong các trường hợp khác nhau (thường là tổng hợp) có thể là tùy chọn.
Ngoài ra còn có hai tùy chọn có thể được sử dụng để xác định khoảng thời gian cửa sổ - ROWS BETWEEN
và RANGE BETWEEN
. Những điều này sẽ không hữu ích cho chúng ta trong trường hợp cụ thể này.
Cuối cùng chúng ta có thể sử dụng nó cho một truy vấn:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
Lưu ý rằng ntile
không liên quan trong bất kỳ cách nào để các quantiles.