2016-10-07 17 views
5

Khi trạng thái câu hỏi, tôi muốn sử dụng một hàm một phần, được tạo thành bằng orElse, như một udf trong tia lửa. Dưới đây là một ví dụ có thể được chạy trong vỏ spark:Cách sử dụng hàm một phần được tạo bằng orElse làm udf trong tia lửa

val df = sc.parallelize(1 to 15).toDF("num") 
df.show 

//Testing out a normal udf - this works 
val gt5: (Int => String) = num => (num > 5).toString 
val gt5Udf = udf(gt5) 
df.withColumn("gt5", gt5Udf(col("num"))).show 

//Now create a udf of a partial function composed with orElse 
val baseline: PartialFunction[Int, String] = { case _ => "baseline" } 
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" } 
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" } 
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" } 

val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline 
val composedUdf = udf(composed) 

//This fails (but this is what I'd like to do) 
df.withColumn("pf", composedUdf(col("num"))).show 

//Use a partial function not composed with orElse - this works 
val baselineUdf = udf(baseline) 
df.withColumn("pf", baselineUdf(col("num"))).show 

Tôi hiện đang chạy này trên một cụm độc lập ba nút với cấu hình sau:

  • spark: 1.6.0
  • HDFS: 2.4.1
  • scala: 2.10.5

tôi tìm thấy những gì tôi nghĩ là một đầu mối trong câu trả lời này: Why Scala can serialize Function but not PartialFunction?

vì vậy tôi cố gắng:

scala> composed.isInstanceOf[Serializable] 
res: Boolean = false 

scala> composedUdf.isInstanceOf[Serializable] 
res: Boolean = true 

scala> baseline.isInstanceOf[Serializable] 
res: Boolean = true 

scala> baselineUdf.isInstanceOf[Serializable] 
res: Boolean = true 

Tôi nhận được mờ ở đây, nhưng có vẻ như sáng tác một chức năng một phần với OrElse loại bỏ các serialization?

Tôi nghĩ rằng lỗi nhiều thông tin nhất là:

org.apache.spark.SparkException: Task not serializable 
... 
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse 
... 

Làm thế nào để khắc phục điều đó? Hay tôi ra khỏi căn cứ?

Cảm ơn bạn đã giúp đỡ!

+2

Vấn đề thú vị mà tôi ước tôi có câu trả lời! Tôi không. Điều có thể hữu ích với bạn là UDFs trong Spark là các hộp đen cho Catalyst Optimizer và nên được sử dụng cẩn thận vì chúng không được tối ưu hóa. Thông thường bạn có thể nhận được kết quả tốt hơn với các chức năng hiện có của Spark SQL. Ngoài ra '(1 đến 15) .toDF (" num ")' cũng hoạt động. –

+1

Ugh. Trong khi 'PartialFunction' chỉ là một đặc tính,' orElse' phải trả về 'PartialFunction' cụ thể - vì vậy nó gợi lên một điểm ngay tại chỗ, ngoại trừ việc lớp đó không được nối tiếp (ngay cả khi không có lý do chính đáng) được). Có vấn đề của bạn. – Alec

+1

Nó sẽ hoạt động nếu bạn nhấc nó lên và bọc nó vào một chức năng khác, giống như 'val bao gồm: Int => Option [String] = x => (ge12 orElse ge7 orElse ge3 orElse baseline) .lift.apply (x)', nếu đó là điều bạn có thể sống cùng. – lpiepiora

Trả lời

3

Nó sẽ hoạt động nếu bạn nhấc nó lên và bọc nó vào một chức năng khác.

1

Mặc dù điều này không trực tiếp giải quyết vấn đề của bạn Tôi muốn đề xuất và giải pháp thay thế bằng cách sử dụng các hàm SQL.

Trước tiên, bạn sẽ phải nhập khẩu chức năng yêu cầu:

import org.apache.spark.sql.functions.{when, lit} 

và một số implicits cho ngắn gọn:

import sqlContext.implicits._ 

Tiếp theo, bạn có thể bày tỏ những điều kiện tương tự như trong mã của bạn:

val baseline = lit("baseline") 
val ge3 = when($"num" >= 3, ">=3") 
val ge7 = when($"num" >= 7, ">=7") 
val ge12 = when($"num" >= 12, ">=12") 

val composed = ge12 otherwise (ge7 otherwise (ge3 otherwise baseline)) 

Trong biểu mẫu này, việc này ít thanh lịch hơn một chút nhưng bạn có thể không cần nỗ lực nào để soạn thảo expr ession như thế này bằng cách sử dụng API thu thập tiêu chuẩn (foldLeft/foldRight) và, unlike UDFs, kết quả có thể được tối ưu hóa bởi Trình tối ưu hóa chất xúc tác.

Các vấn đề liên quan