8

Tôi đang cố chuyển đổi một khung dữ liệu thông qua một hàm lấy mảng làm tham số. Mã của tôi trông giống như sau:Truyền mảng dưới dạng tham số UDF trong Spark SQL

def getCategory(categories:Array[String], input:String): String = { 
    categories(input.toInt) 
} 

val myArray = Array("a", "b", "c") 

val myCategories =udf(getCategory _) 

val df = sqlContext.parquetFile("myfile.parquet) 

val df1 = df.withColumn("newCategory", myCategories(lit(myArray), col("myInput")) 

Tuy nhiên, ánh sáng không thích mảng và lỗi tập lệnh này. Tôi cố gắng definining một chức năng áp dụng một phần mới và sau đó là udf sau đó:

val newFunc = getCategory(myArray, _:String) 
val myCategories = udf(newFunc) 

val df1 = df.withColumn("newCategory", myCategories(col("myInput"))) 

này không làm việc, hoặc khi tôi nhận được một ngoại lệ nullPointer và nó xuất hiện myArray không được công nhận. Bất kỳ ý tưởng về cách tôi vượt qua một mảng như một tham số cho một chức năng với một dataframe?

Trên một ghi chú riêng biệt, bất kỳ giải thích nào về lý do tại sao thực hiện điều gì đó đơn giản như sử dụng hàm trên khung dữ liệu quá phức tạp (xác định hàm, xác định lại dưới dạng UDF, v.v.)?

Trả lời

7

Nhiều khả năng không phải là giải pháp đẹp nhất nhưng bạn có thể thử một cái gì đó như thế này:

def getCategory(categories: Array[String]) = { 
    udf((input:String) => categories(input.toInt)) 
} 

df.withColumn("newCategory", getCategory(myArray)(col("myInput"))) 

Bạn cũng có thể thử một array của literals:

val getCategory = udf(
    (input:String, categories: Array[String]) => categories(input.toInt)) 

df.withColumn(
    "newCategory", getCategory($"myInput", array(myArray.map(lit(_)): _*))) 

Trên một mặt lưu ý sử dụng Map thay vì Array có lẽ là một ý tưởng tốt hơn:

def mapCategory(categories: Map[String, String], default: String) = { 
    udf((input:String) => categories.getOrElse(input, default)) 
} 

val myMap = Map[String, String]("1" -> "a", "2" -> "b", "3" -> "c") 

df.withColumn("newCategory", mapCategory(myMap, "foo")(col("myInput"))) 

Từ Spark 1.5.0 bạn cũng có thể sử dụng một array chức năng:

import org.apache.spark.sql.functions.array 

val colArray = array(myArray map(lit _): _*) 
myCategories(lit(colArray), col("myInput")) 

Xem thêm Spark UDF with varargs

Các vấn đề liên quan