7

Tôi đã cố gắng tạo và lưu Pipeline với các giai đoạn tùy chỉnh. Tôi cần thêm column vào số DataFrame bằng cách sử dụng UDF. Vì vậy, tôi đã tự hỏi nếu nó có thể chuyển đổi một UDF hoặc một hành động tương tự thành một Transformer?Làm thế nào để tạo một biến tùy chỉnh từ một UDF?

Tùy chỉnh của tôi UDF trông giống như thế này và tôi muốn tìm hiểu cách thực hiện bằng cách sử dụng UDF làm tùy chỉnh Transformer.

def getFeatures(n: String) = { 
    val NUMBER_FEATURES = 4 
    val name = n.split(" +")(0).toLowerCase 
    ((1 to NUMBER_FEATURES) 
     .filter(size => size <= name.length) 
     .map(size => name.substring(name.length - size))) 
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name)) 

Trả lời

12

Nó không phải là một giải pháp đầy đủ tính năng nhưng bạn có thể bắt đầu với một cái gì đó như thế này:

import org.apache.spark.ml.{UnaryTransformer} 
import org.apache.spark.ml.util.Identifiable 
import org.apache.spark.sql.types.{ArrayType, DataType, StringType} 

class NGramTokenizer(override val uid: String) 
    extends UnaryTransformer[String, Seq[String], NGramTokenizer] { 

    def this() = this(Identifiable.randomUID("ngramtokenizer")) 

    override protected def createTransformFunc: String => Seq[String] = { 
    getFeatures _ 
    } 

    override protected def validateInputType(inputType: DataType): Unit = { 
    require(inputType == StringType) 
    } 

    override protected def outputDataType: DataType = { 
    new ArrayType(StringType, true) 
    } 
} 

Kiểm tra nhanh:

val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v") 
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs") 

transformer.transform(df).show 
// +---+------+------------------+ 
// | k|  v|    vs| 
// +---+------+------------------+ 
// | 1|abcdef|[f, ef, def, cdef]| 
// | 2|foobar|[r, ar, bar, obar]| 
// +---+------+------------------+ 

Bạn thậm chí có thể cố gắng khái quát nó để cái gì đó như này:

import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 
import scala.reflect.runtime.universe._ 

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
    override val uid: String, 
    f: T => U 
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] { 

    override protected def createTransformFunc: T => U = f 

    override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == schemaFor[T].dataType) 

    override protected def outputDataType: DataType = schemaFor[U].dataType 
} 

val transformer = new UnaryUDFTransformer("featurize", getFeatures) 
    .setInputCol("v") 
    .setOutputCol("vs") 

Nếu bạn muốn sử dụng UDF không phải là chức năng bọc, bạn sẽ phải mở rộng Transformer trực tiếp và ghi đè phương thức transform. Thật không may phần lớn các lớp hữu ích là riêng tư nên nó có thể khá khó khăn.

Hoặc bạn có thể đăng ký UDF:

spark.udf.register("getFeatures", getFeatures _) 

và sử dụng SQLTransformer

import org.apache.spark.ml.feature.SQLTransformer 

val transformer = new SQLTransformer() 
    .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__") 

transformer.transform(df).show 
// +---+------+------------------+ 
// | k|  v|    vs| 
// +---+------+------------------+ 
// | 1|abcdef|[f, ef, def, cdef]| 
// | 2|foobar|[r, ar, bar, obar]| 
// +---+------+------------------+ 
+0

Tôi đã cố gắng lưu mô hình của mình nhưng có nội dung 'Thông báo: Viết đường ống sẽ không thành công trên Đường ống này vì nó chứa một giai đoạn không thực thi được Ghi. Giai đoạn không ghi được: ngramtokenizer_f784079e2124 của lớp loại' tôi có phải thực hiện giao diện Ghi được không? –

+1

Đây là phần xấu mà tôi đã đề cập trước đây. Theo như tôi biết cách tiếp cận tốt nhất là triển khai 'DefaultParamsWritable' và' DefaultParamsReadable' nhưng nó sẽ không thể thực hiện được nếu không đặt ít nhất một phần mã của bạn trong gói ML. Bạn có thể thử với 'MLWritable' /' MLReadable'. – zero323

1

ban đầu tôi đã cố gắng để kéo dài tuổi TransformerUnaryTransformer tóm tắt nhưng gặp rắc rối với ứng dụng của tôi không có khả năng đạt DefaultParamsWriteable .Như một ví dụ có thể liên quan đến vấn đề của bạn, tôi đã tạo một trình bình thường hóa đơn giản như một UDF theo sau từ this example. Mục tiêu của tôi là đối sánh các cụm từ với các mẫu và tập hợp để thay thế chúng bằng các thuật ngữ chung chung. Ví dụ:

"\b[A-Z0-9._%+-][email protected][A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr" 

Đây là lớp

import scala.util.matching.Regex 

class TermNormalizer(normMap: Map[Any, String]) { 
    val normalizationMap = normMap 

    def normalizeTerms(terms: Seq[String]): Seq[String] = { 
    var termsUpdated = terms 
    for ((term, idx) <- termsUpdated.view.zipWithIndex) { 
     for (normalizer <- normalizationMap.keys: Iterable[Any]) { 
     normalizer match { 
      case (regex: Regex) => 
      if (!regex.findFirstIn(term).isEmpty) termsUpdated = 
       termsUpdated.updated(idx, normalizationMap(regex)) 
      case (set: Set[String]) => 
      if (set.contains(term)) termsUpdated = 
       termsUpdated.updated(idx, normalizationMap(set)) 
     } 
     } 
    } 
    termsUpdated 
    } 
} 

tôi sử dụng nó như thế này:

val testMap: Map[Any, String] = Map("hadoop".r -> "elephant", 
    "spark".r -> "sparky", "cool".r -> "neat", 
    Set("123", "456") -> "set1", 
    Set("789", "10") -> "set2") 

val testTermNormalizer = new TermNormalizer(testMap) 
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String])) 

val trainingTest = sqlContext.createDataFrame(Seq(
    (0L, "spark is cool 123", 1.0), 
    (1L, "adsjkfadfk akjdsfhad 456", 0.0), 
    (2L, "spark rocks my socks 789 10", 1.0), 
    (3L, "hadoop is cool 10", 0.0) 
)).toDF("id", "text", "label") 

val testTokenizer = new Tokenizer() 
    .setInputCol("text") 
    .setOutputCol("words") 

val tokenizedTrainingTest = testTokenizer.transform(trainingTest) 
println(tokenizedTrainingTest 
    .select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false)) 

Bây giờ tôi đọc những câu hỏi một chút gần gũi hơn, có vẻ như bạn hỏi làm thế nào để tránh làm nó theo cách này lol. Dù sao, tôi vẫn sẽ đăng nó trong trường hợp một người nào đó trong tương lai đang tìm kiếm một cách dễ dàng để áp dụng một máy biến áp giống như chức năng

0

Nếu bạn muốn biến máy biến này thành công, thì bạn có thể thực hiện lại các đặc điểm như HasInputCol trong thư viện sharedParams trong một gói công khai mà bạn chọn và sau đó sử dụng chúng với đặc tính DefaultParamsWritable để làm cho máy biến áp có thể tiếp tục tồn tại.

Bằng cách này bạn cũng có thể tránh phải đặt một phần mã của mình bên trong các gói ml lõi lửa nhưng bạn loại duy trì một tập hợp các tham số song song trong gói của riêng bạn. Điều này không thực sự là một vấn đề cho họ hầu như không bao giờ thay đổi.

Nhưng hãy theo dõi lỗi trong bảng JIRA here yêu cầu một số chia sẻ chungParams được đặt ở chế độ công khai thay vì riêng tư thành ml để mọi người có thể trực tiếp sử dụng những người đó từ các lớp bên ngoài.

Các vấn đề liên quan