2015-05-25 25 views
6

Tôi khá mới để châm ngòi và ngôn ngữ scala và muốn union all RDDs trong một danh sách như sau (List<RDD> to RDD):Spark: Làm thế nào để kết hợp một danh sách <RDD> để RDD

val data = for (item <- paths) yield { 
     val ad_data_path = item._1 
     val ad_data = SparkCommon.sc.textFile(ad_data_path).map { 
      line => { 
       val ad_data = new AdData(line) 
       (ad_data.ad_id, ad_data) 
      } 
     }.distinct() 
    } 
val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _) 

tôi chạy mã trong IntelliJ trong khi luôn gặp lỗi như:

ava.lang.NullPointerException 
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59) 
at org.apache.spark.rdd.RDD.union(RDD.scala:438) 
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177) 
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) 
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) 
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Bất kỳ ai có ý tưởng gì về lỗi này? Cảm ơn trước :)

Trả lời

17

Đây có thể là nguyên nhân gây ra,

val listA = 1 to 10 
for(i <- listA; if i%2 == 0)yield {i} 

sẽ trở lại Vector (2,4,6,8,10), trong khi

for(i <- listA; if i%2 == 0)yield {val c = i} 

sẽ trở lại Vector ((),(),(),(),())

Đó chính xác là những gì đang xảy ra trong trường hợp của bạn. Bạn đang khởi tạo ad_data nhưng không trả lại lợi nhuận.

Theo như câu hỏi của bạn có liên quan, ví dụ: Danh sách [RDD] để RDD

đây là giải pháp:

val listA = sc.parallelize(1 to 10) 
val listB = sc.parallelize(10 to 1 by -1) 

tạo danh sách 2 RDDS

val listC = List(listA,listB) 

chuyển đổi Liệt kê [RDD] tới RDD

val listD = listC.reduce(_ union _) 

Hy vọng, điều này sẽ trả lời câu hỏi của bạn.

+0

Thanks a lot, các vấn đề được giải quyết với giải pháp của bạn. – juffun

+0

@ juffun, bạn có thể chấp nhận câu trả lời, nếu giải pháp có hiệu quả cho bạn không :) – Akash

+0

chắc chắn, đã được chấp nhận. – juffun

0

Một phương pháp đơn giản khác để chuyển đổi từ Danh sách RDD sang RDD. SparkContext có hai phương pháp kết hợp quá tải, một chấp nhận hai RDDs và khác chấp nhận Danh sách RDDs

đoàn (lần đầu tiên, phần còn lại) đoàn (rdds: Seq [RDD [T]]))

Các vấn đề liên quan