Tôi có RDD 'inRDD'
của biểu mẫu RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])]
là PairRDD(key,value)
trong đó khóa là Vector[(Int, Byte)]
và giá trị là Vector[(Int, Byte)]
.quá nhiều phím bản đồ gây ra ngoại lệ bộ nhớ trong spark
Đối với mỗi phần tử (Int, Byte)
trong vectơ trường khóa và mỗi phần tử (Int, Byte)
trong vectơ trường giá trị, tôi muốn nhận cặp (khóa, giá trị) mới trong đầu ra RDD là (Int, Int), (Byte, Byte)
.
Điều đó sẽ cho tôi RDD của biểu mẫu RDD[((Int, Int), (Byte, Byte))]
.
Ví dụ, inRDD
nội dung có thể được như thế,
(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2)))
đó sẽ trở thành
((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2))
Tôi có đoạn code sau cho điều đó.
val outRDD = inRDD.flatMap {
case (left, right) =>
for ((ll, li) <- left; (rl, ri) <- right) yield {
(ll,rl) -> (li,ri)
}
}
Nó hoạt động khi các vectơ có kích thước nhỏ trong inRDD
. Nhưng khi có nhiều yếu tố trong vectơ, tôi nhận được out of memory exception
. Tăng bộ nhớ khả dụng để kích hoạt chỉ có thể giải quyết cho các đầu vào nhỏ hơn và lỗi xuất hiện trở lại cho các đầu vào lớn hơn. Có vẻ như tôi đang cố gắng lắp ráp một cấu trúc khổng lồ trong bộ nhớ. Tôi không thể viết lại mã này theo bất kỳ cách nào khác.
Tôi đã triển khai một logic tương tự với java in hadoop
như sau.
for (String fromValue : fromAssetVals) {
fromEntity = fromValue.split(":")[0];
fromAttr = fromValue.split(":")[1];
for (String toValue : toAssetVals) {
toEntity = toValue.split(":")[0];
toAttr = toValue.split(":")[1];
oKey = new Text(fromEntity.trim() + ":" + toEntity.trim());
oValue = new Text(fromAttr + ":" + toAttr);
outputCollector.collect(oKey, oValue);
}
}
Nhưng khi tôi thử điều gì đó tương tự trong tia lửa, tôi nhận được ngoại lệ rested lồng nhau.
Làm cách nào để thực hiện điều này một cách hiệu quả với spark using scala
?
Bạn đã thử tăng số lượng phân vùng? – BlackBear
@BlackBear Có. Nhưng điều đó không giúp được gì. – CRM