2016-02-18 16 views
6

Tôi muốn tạo RDD để thu thập kết quả tính toán lặp lại.Tạo RDD để thu thập kết quả của phép tính lặp lại

Làm thế nào tôi có thể sử dụng một vòng lặp (hoặc bất kỳ thay thế) để thay thế đoạn mã sau:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(tạo RDDs bước N và nén sau đó cùng lúc kết thúc cũng sẽ là ok miễn 50 RDDs được tạo ra lặp đi lặp lại để tôn trọng hạt = (bước (n-1) MAX) condition)

+0

tôi muốn sử dụng 'Stream.unfold' từ scalaz để tạo ra một luồng các bước và sau đó nén với chính nó và/hoặc scanRight .. – Reactormonk

Trả lời

6

một hàm đệ quy sẽ làm việc:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

Đệ quy đệ quy sẽ không bảo vệ bạn khỏi dòng RDD thổi ngăn xếp :) – zero323

+0

@ zero323 Đồng ý. Tuy nhiên, vấn đề này là cố hữu với các yêu cầu của câu hỏi. Bất kỳ câu trả lời nào cũng sẽ gặp phải một vấn đề tương tự. –

+0

Chỉ muốn chỉ ra rằng bạn đang xây dựng một cấu trúc dữ liệu đệ quy đằng sau hậu trường mà sẽ không được tối ưu hóa đuôi. Không có gì hơn :) Và thực sự bạn có thể giải quyết nó và tránh vấn đề bằng cách sử dụng các trạm kiểm soát. Nó thậm chí còn có khả năng giải quyết mà không cần một mã zip đơn lẻ :) – zero323

Các vấn đề liên quan