2016-11-02 33 views
5

Giả sử tôi có hai PySpark DataFrames df1df2.Pyspark Dataframe Áp dụng chức năng cho hai cột

df1= 'a' 
     1  
     2  
     5  

df2= 'b' 
     3 
     6 

Và tôi muốn tìm ra df2['b'] giá trị gần nhất cho mỗi df1['a'], và thêm các giá trị gần như là một cột mới trong df1.

Nói cách khác, đối với mỗi giá trị x trong df1['a'], tôi muốn tìm một y rằng đạt min(abx(x-y)) cho tất cả y in df2['b'] (lưu ý: có thể giả định rằng chỉ có một y rằng có thể đạt được khoảng cách tối thiểu), và kết quả sẽ được

'a' 'b' 
1  3 
2  3 
5  6 

tôi đã thử các mã sau đây để tạo ra một ma trận khoảng cách đầu tiên (trước khi tìm các giá trị đạt khoảng cách tối thiểu):

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def dict(x,y): 
    return abs(x-y) 
udf_dict = udf(dict, IntegerType()) 

sql_sc = SQLContext(sc) 
udf_dict(df1.a, df2.b) 

mang đến cho

Column<PythonUDF#dist(a,b)> 

Sau đó, tôi đã cố gắng

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b)) 

mà chạy mãi mãi mà không đưa ra lỗi/đầu ra.

Câu hỏi của tôi là:

  1. Như tôi mới vào Spark, là cách của tôi để xây dựng các DataFrame đầu ra hiệu quả? (Cách của tôi sẽ tạo ma trận khoảng cách cho tất cả các giá trị ab trước, sau đó tìm số min)
  2. Có gì sai với dòng cuối cùng của mã của tôi và cách khắc phục?

Trả lời

5

Bắt đầu với câu hỏi thứ hai của bạn - bạn có thể áp dụng udf chỉ để dataframe hiện có, tôi nghĩ rằng bạn đang nghĩ về một cái gì đó như thế này:

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show() 
+---+---+--------+ 
| a| b|distance| 
+---+---+--------+ 
| 1| 3|  2| 
| 1| 6|  5| 
| 2| 3|  1| 
| 2| 6|  4| 
| 5| 3|  2| 
| 5| 6|  1| 
+---+---+--------+ 

Nhưng có một cách hiệu quả hơn để áp dụng khoảng cách này, bằng cách sử dụng nội abs:

>>> from pyspark.sql.functions import abs 
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b)) 

Sau đó, bạn có thể tìm thấy phù hợp với số bằng cách tính toán:

>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b)) 
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance')) 
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show() 
+---+---+                  
| a| b| 
+---+---+ 
| 5| 6| 
| 1| 3| 
| 2| 3| 
+---+---+ 
Các vấn đề liên quan