2015-01-23 23 views
6

Tôi đã tìm kiếm giải pháp trong một thời gian dài nhưng không nhận được bất kỳ thuật toán chính xác nào.Cách chuyển RDD [(Khóa, Giá trị)] thành Bản đồ [Khóa, RDD [Giá trị]]

Sử dụng Spark RDD trong scala, làm cách nào tôi có thể chuyển đổi RDD[(Key, Value)] thành Map[key, RDD[Value]], biết rằng tôi không thể sử dụng thu thập hoặc các phương pháp khác có thể tải dữ liệu vào bộ nhớ?

Trong thực tế, mục tiêu cuối cùng của tôi là để lặp trên Map[Key, RDD[Value]] bởi chính và lời gọi saveAsNewAPIHadoopFile cho mỗi RDD[Value]

Ví dụ, nếu tôi nhận được:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] 

tôi muốn:

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])] 

Tôi tự hỏi nếu nó sẽ không tốn quá nhiều tiền để làm điều đó bằng cách sử dụng filter trên mỗi khóa A, B, C của RDD[(Key, Value)], nhưng tôi không biết nếu gọi bộ lọc nhiều lần có các phím khác nhau sẽ có hiệu quả? (Off Tất nhiên là không, nhưng có lẽ sử dụng cache?)

Cảm ơn bạn

+2

"biết rằng tôi không thể sử dụng thu thập hoặc các phương pháp khác có thể tải dữ liệu vào bộ nhớ?". Điều này không có ý nghĩa. Bản đồ kết quả sẽ phải phù hợp với bộ nhớ. –

+0

Chỉ cần đâm hoang dã trong bóng tối; sẽ không groupBy (...) cung cấp cho bạn một cái gì đó bạn có thể sử dụng? Nó sẽ cung cấp cho bạn RDD [khóa, Iterable [giá trị]] – thoredge

+0

@thoredge Tôi không chắc chắn rằng một iterable nên phù hợp với bộ nhớ cho số lượng rất lớn của dữ liệu, nhưng thực sự theo khối lượng đầu vào của tôi này có thể là một giải pháp – Seb

Trả lời

2

Bạn nên sử dụng mã như thế này (Python):

rdd = sc.parallelize([("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]).cache() 
keys = rdd.keys().distinct().collect() 
for key in keys: 
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y) 
    out.saveAsNewAPIHadoopFile (...) 

Một RDD không thể là một phần của RDD khác và bạn không có tùy chọn để chỉ thu thập các khóa và chuyển đổi các giá trị liên quan của chúng thành một RDD riêng biệt. Trong ví dụ của tôi, bạn sẽ lặp qua RDD được lưu trong bộ nhớ cache, là ok và sẽ hoạt động nhanh

+0

tôi đã không chắc chắn về hiệu quả của bộ lọc, nhưng tôi nghĩ rằng đây là giải pháp tôi sẽ thực hiện. – Seb

+0

Không có sự biến đổi nào sẵn sàng cho logic của bạn, tôi sợ rằng nếu bạn muốn một cái gì đó hiệu quả hơn, bạn phải tự mình thực hiện nó – 0x0FFF

+0

Về cơ bản đây là một giải pháp tối ưu. Bạn có thể đáp ứng mục tiêu cuối cùng của mình bằng văn bản cho một tập tin riêng biệt cho mỗi khóa trong một vượt qua với một MultipleTextOutput. –

-1

Đây là mã thử nghiệm đơn giản của tôi.

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) 
val groupby_RDD = test_RDD.groupByKey() 
val result_RDD = groupby_RDD.map{v => 
    var result_list:List[Int] = Nil 
    for (i <- v._2) { 
     result_list ::= i 
    } 
    (v._1, result_list) 
} 

Kết quả là dưới

result_RDD.take(3) 
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6))) 

Hoặc bạn có thể làm điều đó như

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6))) 
val nil_list:List[Int] = Nil 
val result2 = test_RDD.aggregateByKey(nil_list)(
    (acc, value) => value :: acc, 
    (acc1, acc2) => acc1 ::: acc2) 

kết quả này là như thế này

result2.take(3) 
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6))) 
0

Nghe có vẻ giống như những gì bạn thực sự muốn là lưu KV RDD của bạn vào một tệp riêng biệt cho từng tệp Chìa khóa. Thay vì tạo một Map[Key, RDD[Value]], hãy xem xét sử dụng một mã số MultipleTextOutputFormatsimilar to the example here. Mã này là khá nhiều trong ví dụ này.

Lợi ích của phương pháp này là bạn được đảm bảo chỉ nhận một lần vượt qua RDD sau khi phát ngẫu nhiên và bạn nhận được kết quả tương tự như bạn muốn. Nếu bạn đã làm điều này bằng cách lọc và tạo một số ID như được đề xuất trong câu trả lời khác (trừ khi bộ lọc hỗ trợ đẩy xuống nguồn của bạn), bạn sẽ kết thúc bằng một lần vượt qua tập dữ liệu cho mỗi khóa riêng lẻ sẽ chậm hơn.

Các vấn đề liên quan