2015-09-18 18 views
6

Tôi đang cố gắng để lọc một dataframe chống lại khác:Làm thế nào để lọc một tia lửa dataframe chống dataframe khác

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id") 
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id") 

Bây giờ tôi muốn lọc df1 và lấy lại một dataframe có chứa tất cả các hàng trong df1 nơi user_id nằm trong df2 ("valid_id"). Nói cách khác, tôi muốn tất cả các hàng trong df1 nơi user_id là một trong hai 2,3,4,5 hoặc 6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id")) 
warning: there were 1 deprecation warning(s); re-run with -deprecation for details 
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20); 

Mặt khác khi tôi cố gắng làm một bộ lọc chống lại một chức năng, tất cả mọi thứ sẽ rất tốt :

scala> df1.select("user_id").filter(($"user_id" % 2) === 0) 
res1: org.apache.spark.sql.DataFrame = [user_id: int] 

Tại sao tôi gặp phải lỗi này? Có gì sai với cú pháp của tôi không?

comment sau tôi đã cố gắng để làm một trái bên ngoài tham gia:

scala> df1.show 
+-------+------------------+-------+ 
| name|    score|user_id| 
+-------+------------------+-------+ 
| user 1|    0.123|  1| 
| user 2|    0.246|  2| 
| user 3|    0.369|  3| 
| user 4|    0.492|  4| 
| user 5|    0.615|  5| 
| user 6|    0.738|  6| 
| user 7|    0.861|  7| 
| user 8|    0.984|  8| 
| user 9|    1.107|  9| 
|user 10|    1.23|  10| 
|user 11|    1.353|  11| 
|user 12|    1.476|  12| 
|user 13|    1.599|  13| 
|user 14|    1.722|  14| 
|user 15|    1.845|  15| 
|user 16|    1.968|  16| 
|user 17|    2.091|  17| 
|user 18|    2.214|  18| 
|user 19|2.3369999999999997|  19| 
|user 20|    2.46|  20| 
+-------+------------------+-------+ 
only showing top 20 rows 

scala> df2.show 
+--------+ 
|valid_id| 
+--------+ 
|  2| 
|  3| 
|  4| 
|  5| 
|  6| 
+--------+ 

scala> df1.join(df2, df1("user_id") === df2("valid_id")) 
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res6.collect 
res7: Array[org.apache.spark.sql.Row] = Array() 

scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer") 
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int] 
scala> res8.count 
res9: Long = 0 

Tôi đang chạy spark 1.5.0 với scala 2.10.5

+0

Bạn muốn lọc hoặc thực hiện một kết hợp trên hai khung dữ liệu? – eliasah

+0

@eliasah Tôi muốn lấy một khung dữ liệu với một tập con của các hàng từ df1. đối với mỗi hàng r trong df1, nếu giá trị của r ("user_id") nằm trong df2 ("valid_id"), thì r hàng sẽ được bao gồm trong khung dữ liệu kết quả. – polo

+0

Sau đó, bạn sẽ phải thực hiện một tham gia bên ngoài bên trái từ df1 đến df2 trên userId == validId – eliasah

Trả lời

11

Bạn muốn có một (thường xuyên) bên tham gia, không một bên ngoài tham gia :)

df1.join(df2, df1("user_id") === df2("valid_id")) 
+0

Chắc chắn! Xin lỗi, tôi xấu! Bây giờ tôi biết rằng nó không phải là một ý tưởng tốt để đi trên SO với chứng mất ngủ :) – eliasah

+0

@ glennie-helles-sindholt: Cảm ơn bạn đã trả lời. Điều này có ý nghĩa, nhưng trả về một dataframe trống. Xem các chỉnh sửa của tôi với ví dụ về mã trong câu hỏi. – polo

+0

@polo Tôi phải nói rằng mọi thứ có vẻ đúng, theo như tôi thấy. Tôi vừa sao chép các lệnh của bạn vào trình bao của riêng tôi (cũng chạy Spark 1.5.0) và mọi thứ hoạt động hoàn hảo. Bạn không thay đổi có một số rõ ràng 'val sc = new SparkContext (conf)' một nơi nào đó trong vỏ của bạn, phải không? Gần đây tôi tình cờ gặp một người khác nhìn thấy hành vi kỳ lạ bởi vì anh ta đã tuyên bố biến số của riêng mình. Nếu không, tôi nghĩ rằng tôi mới ra khỏi ý tưởng như tôi chỉ đơn giản là không thể tái tạo vấn đề. Tôi cho rằng bạn đã cố gắng khởi chạy lại trình bao của mình? –

Các vấn đề liên quan