2015-07-16 22 views
5

Tôi có các vectơ được gắn nhãn (LabeledPoint-s) được gắn thẻ bởi một số nhóm. Đối với mỗi nhóm tôi cần phải tạo một phân loại Logistic Regression riêng biệt:Spark MLlib: các bộ phân loại xây dựng cho mỗi nhóm dữ liệu

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 

object Scratch { 

    val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) 
) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setMaster("local[5]") 
     .setAppName("Scratch") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 

    val trainRDD = sc.parallelize(train) 
    val modelByGroup = trainRDD.groupByKey().map({case (group, iter) => 
          (group, new LogisticRegressionWithLBFGS().run(iter))}) 
    } 

} 

LogisticRegressionWithLBFGS().run(iter) không biên dịch vì run làm việc với RDD và không phải với iterator rằng groupBy lợi nhuận. Vui lòng tư vấn cách tạo nhiều phân loại như có các nhóm (thẻ) trong dữ liệu đầu vào.

Cập nhật - chứng tỏ rằng lồng nhau RDD lặp không hoạt động:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 

object Scratch { 

    val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) 
) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setMaster("local[5]") 
     .setAppName("Scratch") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 

    val trainRDD = sc.parallelize(train) 
    val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct 
    for (key <- keys) { 
    // key is Int here! 
     // Get train data for the current group (key): 
     val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache() 

     /** 
     * Which results in org.apache.spark.SparkException: 
     * RDD transformations and actions can only be invoked by the driver, 
     * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid 
     * because the values transformation and count action cannot be performed inside of the rdd1.map transformation. 
     * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87) 
     */ 
    } 
    } 
} 

Hình như không có cách nào để sử dụng biến đổi bên trong biến đổi khác, có đúng không?

Trả lời

3

Nếu trình phân loại sử dụng của bạn trên mỗi nhóm bạn không cần mllib. Mllib được thiết kế để sử dụng với bộ phân phối (bộ của bạn không phải là bạn có butch bộ địa phương trên mỗi công nhân). Bạn chỉ có thể sử dụng một số thư viện học tập máy địa phương như weka trên mỗi nhóm trong chức năng bản đồ.

EDIT:

val keys = wholeRDD.map(_._1).distinct.collect 

var models = List() 
for (key <- keys) { 
    val valuesForKey = wholeRDD.filter(_._1 == key) 
    // train model 
    ... 
    models = model::models 
} 
+0

Trong mỗi nhóm I có dữ liệu khổng lồ bộ để đào tạo phân loại cho nhóm Đó là lý do tại sao tôi không thể sử dụng Weka. Bất kỳ ý tưởng nào khác ?. – zork

+0

@zork Nếu không có quá nhiều nhóm, bạn có thể tạo một rdd cho mỗi nhóm. Chỉ cần 'oneKeyRDD = wholeTrainSetRDD.filter (_._ 1 == key)'. – abalcerek

+0

Vâng, tôi cũng nghĩ về điều này. Thật không may tôi có 30 nhóm! – zork

Các vấn đề liên quan