Khi trạng thái câu hỏi, tôi muốn sử dụng một hàm một phần, được tạo thành bằng orElse, như một udf trong tia lửa. Dưới đây là một ví dụ có thể được chạy trong vỏ spark:Cách sử dụng hàm một phần được tạo bằng orElse làm udf trong tia lửa
val df = sc.parallelize(1 to 15).toDF("num")
df.show
//Testing out a normal udf - this works
val gt5: (Int => String) = num => (num > 5).toString
val gt5Udf = udf(gt5)
df.withColumn("gt5", gt5Udf(col("num"))).show
//Now create a udf of a partial function composed with orElse
val baseline: PartialFunction[Int, String] = { case _ => "baseline" }
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" }
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" }
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" }
val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline
val composedUdf = udf(composed)
//This fails (but this is what I'd like to do)
df.withColumn("pf", composedUdf(col("num"))).show
//Use a partial function not composed with orElse - this works
val baselineUdf = udf(baseline)
df.withColumn("pf", baselineUdf(col("num"))).show
Tôi hiện đang chạy này trên một cụm độc lập ba nút với cấu hình sau:
- spark: 1.6.0
- HDFS: 2.4.1
- scala: 2.10.5
tôi tìm thấy những gì tôi nghĩ là một đầu mối trong câu trả lời này: Why Scala can serialize Function but not PartialFunction?
vì vậy tôi cố gắng:
scala> composed.isInstanceOf[Serializable]
res: Boolean = false
scala> composedUdf.isInstanceOf[Serializable]
res: Boolean = true
scala> baseline.isInstanceOf[Serializable]
res: Boolean = true
scala> baselineUdf.isInstanceOf[Serializable]
res: Boolean = true
Tôi nhận được mờ ở đây, nhưng có vẻ như sáng tác một chức năng một phần với OrElse loại bỏ các serialization?
Tôi nghĩ rằng lỗi nhiều thông tin nhất là:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse
...
Làm thế nào để khắc phục điều đó? Hay tôi ra khỏi căn cứ?
Cảm ơn bạn đã giúp đỡ!
Vấn đề thú vị mà tôi ước tôi có câu trả lời! Tôi không. Điều có thể hữu ích với bạn là UDFs trong Spark là các hộp đen cho Catalyst Optimizer và nên được sử dụng cẩn thận vì chúng không được tối ưu hóa. Thông thường bạn có thể nhận được kết quả tốt hơn với các chức năng hiện có của Spark SQL. Ngoài ra '(1 đến 15) .toDF (" num ")' cũng hoạt động. –
Ugh. Trong khi 'PartialFunction' chỉ là một đặc tính,' orElse' phải trả về 'PartialFunction' cụ thể - vì vậy nó gợi lên một điểm ngay tại chỗ, ngoại trừ việc lớp đó không được nối tiếp (ngay cả khi không có lý do chính đáng) được). Có vấn đề của bạn. – Alec
Nó sẽ hoạt động nếu bạn nhấc nó lên và bọc nó vào một chức năng khác, giống như 'val bao gồm: Int => Option [String] = x => (ge12 orElse ge7 orElse ge3 orElse baseline) .lift.apply (x)', nếu đó là điều bạn có thể sống cùng. – lpiepiora