Tôi có các vectơ được gắn nhãn (LabeledPoint-s) được gắn thẻ bởi một số nhóm. Đối với mỗi nhóm tôi cần phải tạo một phân loại Logistic Regression riêng biệt:Spark MLlib: các bộ phân loại xây dựng cho mỗi nhóm dữ liệu
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val modelByGroup = trainRDD.groupByKey().map({case (group, iter) =>
(group, new LogisticRegressionWithLBFGS().run(iter))})
}
}
LogisticRegressionWithLBFGS().run(iter)
không biên dịch vì run
làm việc với RDD
và không phải với iterator rằng groupBy
lợi nhuận. Vui lòng tư vấn cách tạo nhiều phân loại như có các nhóm (thẻ) trong dữ liệu đầu vào.
Cập nhật - chứng tỏ rằng lồng nhau RDD lặp không hoạt động:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
for (key <- keys) {
// key is Int here!
// Get train data for the current group (key):
val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()
/**
* Which results in org.apache.spark.SparkException:
* RDD transformations and actions can only be invoked by the driver,
* not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
* because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
* For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
*/
}
}
}
Hình như không có cách nào để sử dụng biến đổi bên trong biến đổi khác, có đúng không?
Trong mỗi nhóm I có dữ liệu khổng lồ bộ để đào tạo phân loại cho nhóm Đó là lý do tại sao tôi không thể sử dụng Weka. Bất kỳ ý tưởng nào khác ?. – zork
@zork Nếu không có quá nhiều nhóm, bạn có thể tạo một rdd cho mỗi nhóm. Chỉ cần 'oneKeyRDD = wholeTrainSetRDD.filter (_._ 1 == key)'. – abalcerek
Vâng, tôi cũng nghĩ về điều này. Thật không may tôi có 30 nhóm! – zork