2017-09-26 20 views
7

Tôi đang chạy lệnh spark trong EMR với YARN làm trình quản lý tài nguyên và trên 2 nút. Tôi cần phải cố tình thất bại bước nếu tình trạng của tôi không được đáp ứng, do đó, bước tiếp theo không thực hiện theo cấu hình. Để đạt được điều này, tôi đang ném một ngoại lệ tùy chỉnh, sau khi chèn một thông điệp tường trình vào dynamoDB.Spark, Hành vi sai khi ném SparkException trong EMR

Chạy tốt nhưng bản ghi trong Dynamo được chèn hai lần.

Dưới đây là mã của tôi.

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

Nếu tôi loại bỏ dòng để ném ngoại lệ, nó hoạt động tốt nhưng bước được hoàn thành.

Làm cách nào tôi có thể thực hiện bước không thành công mà không nhận được thông báo tường trình hai lần.

Cảm ơn sự giúp đỡ.

Kính trọng, Sorabh

Trả lời

2

Có lẽ lý do nhắn dynamo của bạn đã chèn hai lần là vì tình trạng lỗi của bạn bị trúng đạn và xử lý bởi hai Chấp hành khác nhau. Spark đang phân chia công việc được thực hiện trong số đó là công nhân và những người lao động đó không chia sẻ bất kỳ kiến ​​thức nào.

Tôi không chắc điều gì đang thúc đẩy yêu cầu của bạn để có bước Spark FAIL, nhưng tôi khuyên bạn nên theo dõi trường hợp lỗi đó trong mã ứng dụng của bạn thay vì cố gắng kích hoạt tia lửa trực tiếp. Nói cách khác, viết mã phát hiện lỗi và chuyển lại cho trình điều khiển tia lửa của bạn, sau đó thực hiện hành động đó khi thích hợp.

Một cách để thực hiện việc này là sử dụng bộ tích lũy để đếm bất kỳ lỗi nào xảy ra khi bạn xử lý dữ liệu của mình. Nó sẽ giống gần như thế này (tôi giả sử scala và DataFrames, nhưng bạn có thể thích ứng với của RDD và/hoặc python khi cần thiết):

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

Một điều tốt đẹp về phương pháp này là nếu bạn đang tìm kiếm thông tin phản hồi trong giao diện người dùng Spark, bạn sẽ có thể thấy các giá trị tích lũy ở đó trong khi nó đang chạy. Để tham khảo, dưới đây là tài liệu về bộ tích lũy: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

Các vấn đề liên quan