2016-02-24 26 views
7

Tôi có RDD 'inRDD' của biểu mẫu RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])]PairRDD(key,value) trong đó khóa là Vector[(Int, Byte)] và giá trị là Vector[(Int, Byte)].quá nhiều phím bản đồ gây ra ngoại lệ bộ nhớ trong spark

Đối với mỗi phần tử (Int, Byte) trong vectơ trường khóa và mỗi phần tử (Int, Byte) trong vectơ trường giá trị, tôi muốn nhận cặp (khóa, giá trị) mới trong đầu ra RDD là (Int, Int), (Byte, Byte).

Điều đó sẽ cho tôi RDD của biểu mẫu RDD[((Int, Int), (Byte, Byte))].

Ví dụ, inRDD nội dung có thể được như thế,

(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2))) 

đó sẽ trở thành

((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2)) 

Tôi có đoạn code sau cho điều đó.

val outRDD = inRDD.flatMap {           
    case (left, right) => 
    for ((ll, li) <- left; (rl, ri) <- right) yield { 
     (ll,rl) -> (li,ri) 
    } 
} 

Nó hoạt động khi các vectơ có kích thước nhỏ trong inRDD. Nhưng khi có nhiều yếu tố trong vectơ, tôi nhận được out of memory exception. Tăng bộ nhớ khả dụng để kích hoạt chỉ có thể giải quyết cho các đầu vào nhỏ hơn và lỗi xuất hiện trở lại cho các đầu vào lớn hơn. Có vẻ như tôi đang cố gắng lắp ráp một cấu trúc khổng lồ trong bộ nhớ. Tôi không thể viết lại mã này theo bất kỳ cách nào khác.

Tôi đã triển khai một logic tương tự với java in hadoop như sau.

for (String fromValue : fromAssetVals) { 
    fromEntity = fromValue.split(":")[0]; 
    fromAttr = fromValue.split(":")[1]; 
    for (String toValue : toAssetVals) { 
     toEntity = toValue.split(":")[0]; 
     toAttr = toValue.split(":")[1]; 
     oKey = new Text(fromEntity.trim() + ":" + toEntity.trim()); 
     oValue = new Text(fromAttr + ":" + toAttr); 
     outputCollector.collect(oKey, oValue); 
    } 
} 

Nhưng khi tôi thử điều gì đó tương tự trong tia lửa, tôi nhận được ngoại lệ rested lồng nhau.

Làm cách nào để thực hiện điều này một cách hiệu quả với spark using scala?

+0

Bạn đã thử tăng số lượng phân vùng? – BlackBear

+0

@BlackBear Có. Nhưng điều đó không giúp được gì. – CRM

Trả lời

2

Vâng, nếu sản phẩm của Descartes là lựa chọn duy nhất mà bạn ít nhất có thể làm cho nó lười biếng hơn một chút:

inRDD.flatMap { case (xs, ys) => 
    xs.toIterator.flatMap(x => ys.toIterator.map(y => (x, y))) 
} 

Bạn cũng có thể xử lý này ở cấp Spark

import org.apache.spark.RangePartitioner 

val indexed = inRDD.zipWithUniqueId.map(_.swap) 
val partitioner = new RangePartitioner(indexed.partitions.size, indexed) 
val partitioned = indexed.partitionBy(partitioner) 

val lefts = partitioned.flatMapValues(_._1) 
val rights = partitioned.flatMapValues(_._2) 

lefts.join(rights).values 
Các vấn đề liên quan