2015-09-10 20 views
6

Tôi có một số RDD[String], wordRDD. Tôi cũng có một hàm tạo ra một RDD [String] từ một chuỗi/từ. Tôi muốn tạo RDD mới cho mỗi chuỗi trong wordRDD. Dưới đây là những nỗ lực của tôi:Làm cách nào để tạo tập hợp RDD ra khỏi RDD?

1) Không vì Spark không hỗ trợ RDDs lồng nhau:?

var newRDD = wordRDD.map(word => { 
    // execute myFunction() 
    (new MyClass(word)).myFunction() 
}) 

2) Không thành công (có thể do vấn đề phạm vi):

var newRDD = sc.parallelize(new Array[String](0)) 
val wordArray = wordRDD.collect 
for (w <- wordArray){ 
    newRDD = sc.union(newRDD,(new MyClass(w)).myFunction()) 
} 

kết quả lý tưởng của tôi sẽ trông giống như:

// input RDD (wordRDD) 
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...) 

// myFunction behavior 
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl') 

// after executing myFunction() on each word in wordRDD: 
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...) 

tôi tìm thấy một câu hỏi liên quan ở đây: Spark when union a lot of RDD throws stack overflow error, nhưng nó không giải quyết của tôi vấn đề.

Trả lời

3

Sử dụng flatMap để có được RDD[String] như bạn mong muốn.

var allWords = wordRDD.flatMap { word => 
    (new MyClass(word)).myFunction().collect() 
} 
+1

Làm cách nào để chạy song song? Mọi thứ xảy ra trong 'wordRDD.map' được thực hiện trên cụm. Vì vậy, bên trong 'collect' phải kích hoạt một công việc Spark mới từ bên trong một công việc đang chạy. Tôi nghi ngờ rằng nó sẽ không chạy phân tán. –

+0

Ông cũng có thể thay đổi hàm để trả về mảng thay vì RDD, nhưng câu hỏi không xác định hàm thực tế. –

+0

Nhưng mô tả của ông nói rằng ông có một chức năng, tôi giả định đó là 'myFunction' tạo ra một' RDD [String] 'từ một chuỗi/từ. –

3

Bạn không thể tạo RDD từ trong một số khác RDD.

Tuy nhiên, có thể viết lại hàm myFunction: String => RDD[String], tạo ra tất cả các từ từ đầu vào trong đó một chữ cái bị xóa, thành hàm khác modifiedFunction: String => Seq[String] sao cho nó có thể được sử dụng từ bên trong RDD. Bằng cách đó, nó cũng sẽ được thực hiện song song trên cụm của bạn. Có số modifiedFunction bạn có thể nhận được số RDD cuối cùng với tất cả các từ chỉ bằng cách gọi wordRDD.flatMap(modifiedFunction).

Điểm quan trọng là sử dụng flatMap (để mapflatten biến đổi):

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 

    val input = sc.parallelize(Seq("apple", "ananas", "banana")) 

    // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...) 
    val result = input.flatMap(modifiedFunction) 
} 

def modifiedFunction(word: String): Seq[String] = { 
    word.indices map { 
    index => word.substring(0, index) + word.substring(index+1) 
    } 
} 
Các vấn đề liên quan