Giả sử tôi có hai PySpark DataFrames df1
và df2
.Pyspark Dataframe Áp dụng chức năng cho hai cột
df1= 'a'
1
2
5
df2= 'b'
3
6
Và tôi muốn tìm ra df2['b']
giá trị gần nhất cho mỗi df1['a']
, và thêm các giá trị gần như là một cột mới trong df1
.
Nói cách khác, đối với mỗi giá trị x
trong df1['a']
, tôi muốn tìm một y
rằng đạt min(abx(x-y))
cho tất cả y in df2['b']
(lưu ý: có thể giả định rằng chỉ có một y
rằng có thể đạt được khoảng cách tối thiểu), và kết quả sẽ được
'a' 'b'
1 3
2 3
5 6
tôi đã thử các mã sau đây để tạo ra một ma trận khoảng cách đầu tiên (trước khi tìm các giá trị đạt khoảng cách tối thiểu):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
mang đến cho
Column<PythonUDF#dist(a,b)>
Sau đó, tôi đã cố gắng
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
mà chạy mãi mãi mà không đưa ra lỗi/đầu ra.
Câu hỏi của tôi là:
- Như tôi mới vào Spark, là cách của tôi để xây dựng các DataFrame đầu ra hiệu quả? (Cách của tôi sẽ tạo ma trận khoảng cách cho tất cả các giá trị
a
vàb
trước, sau đó tìm sốmin
) - Có gì sai với dòng cuối cùng của mã của tôi và cách khắc phục?