2015-11-11 22 views
5

Tôi đang sử dụng Apache Spark trong ứng dụng Java của mình. Tôi có hai số DataFrame s: df1df2. df1 chứa s với email, firstNamelastName. df2 chứa s với email.Cách triển khai NOT IN cho hai DataFrames với cấu trúc khác nhau trong Apache Spark

Tôi muốn tạo một DataFrame: df3 có chứa tất cả các hàng trong df1, email nào không có trong df2.

Có cách nào để thực hiện việc này với Apache Spark không? Tôi cố gắng để tạo ra JavaRDD<String> từ df1df2 bởi đúc chúng toJavaRDD() và lọc df1 để chứa tất cả các email và sau đó sử dụng subtract, nhưng tôi không biết làm thế nào để lập bản đồ mới JavaRDD để ds1 và có được một DataFrame.

Về cơ bản tôi cần tất cả các hàng nằm trong số df1 có email không nằm trong số df2.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer "); 

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " + 
          "WHERE product_id = '" + productId + "'"); 

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0)); 

List<String> notBoughtEmails = customers.javaRDD() 
         .map(row -> row.getString(0)) 
         .subtract(customersBoughtEmail).collect(); 

Trả lời

4

Spark 2.0.0+

Bạn có thể sử dụng NOT IN trực tiếp.

Spark < 2.0.0

Nó có thể được thể hiện bằng phép nối ngoài và bộ lọc.

val customers = sc.parallelize(Seq(
    ("[email protected]", "John", "Doe"), 
    ("[email protected]", "Jane", "Doe") 
)).toDF("email", "first_name", "last_name") 

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
    Tuple1("[email protected]") 
)).toDF("email") 

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")), 
    $"email" === $"email_", "leftouter") 
.where($"email_".isNull).drop("email_") 

customersWhoHaventOrderedTheProduct.show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 

liệu SQL tương đương:

customers.registerTempTable("customers") 
customersWhoOrderedTheProduct.registerTempTable(
    "customersWhoOrderedTheProduct") 

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN 
       customersWhoOrderedTheProduct o 
       ON c.email = o.email 
       WHERE o.email IS NULL""" 

sqlContext.sql(query).show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 
+2

Cảm ơn bạn. Ví dụ đầu tiên làm việc cho tôi. Đây là phiên bản Java 'DataFrame customersWhoHaventOrderedTheProduct = customers .join (customersWhoOrderedTheProduct.select (customersWhoOrderedTheProduct.col (" email ")), customers.col (" email ") .đối với (customersWhoOrderedTheProduct.col (" email "))," leftouter ") . where (customersWhoOrderedTheProduct.col (" email "). isNull()). thả (customersWhoOrderedTheProduct.col (" email "));' Tôi đã thử tương đương SQL nhưng điều này xảy ra 'scala.MatchError: UUIDType (của lớp org.apache.spark.sql.cassandra.types.UUIDType $) ' –

+0

Tôi vui vì tôi có thể giúp đỡ. – zero323

+0

Tôi đang sử dụng 'Cassandra' và tôi có một' UUID' làm khóa chính. Có lẽ Scala không thể phù hợp với loại. –

2

tôi đã làm nó trong python, bên cạnh tôi sẽ đề nghị bạn sử dụng số nguyên như phím không dây.

from pyspark.sql.types import * 

samples = sc.parallelize([ 
    ("[email protected]", "Alberto", "Bonsanto"), ("[email protected]", "Miguel", "Bonsanto"), 
    ("[email protected]", "Stranger", "Weirdo"), ("[email protected]", "Dakota", "Bonsanto") 
]) 

keys = sc.parallelize(
    [("[email protected]",), ("[email protected]",), ("[email protected]",)] 
) 

complex_schema = StructType([ 
    StructField("email", StringType(), True), 
    StructField("first_name", StringType(), True), 
    StructField("last_name", StringType(), True) 
]) 

simple_schema = StructType([ 
    StructField("email", StringType(), True) 
]) 

df1 = sqlContext.createDataFrame(samples, complex_schema) 
df2 = sqlContext.createDataFrame(keys, simple_schema) 

df1.show() 
df2.show() 

df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show() 
+0

Cảm ơn bạn. Tôi đang sử dụng 'Cassandra' vì vậy rất nhiều khóa chính của tôi chứa một' UUID'. –

Các vấn đề liên quan