Tôi cũng gặp sự cố này và đây là giải pháp mà tôi đã đưa ra ngay bây giờ. Nó không hoàn hảo, nhưng hy vọng nó sẽ hữu ích. Để tham khảo, tôi đang sử dụng Java 1.7 và AWS Java SDK phiên bản 1.9.13.
Lưu ý rằng mã này giả định rằng bạn đang đợi cụm chấm dứt, không phải là các bước nói đúng; nếu cụm của bạn chấm dứt khi tất cả các bước của bạn được thực hiện điều này là ổn, nhưng nếu bạn đang sử dụng các cụm mà vẫn còn sống sau khi hoàn thành bước này sẽ không giúp bạn quá nhiều.
Ngoài ra, hãy lưu ý rằng mã này theo dõi và ghi nhật ký các thay đổi trạng thái cụm, và ngoài ra chẩn đoán xem cụm đã chấm dứt lỗi và ném ngoại lệ nếu có.
private void yourMainMethod() {
RunJobFlowRequest request = ...;
try {
RunJobFlowResult submission = emr.runJobFlow(request);
String jobFlowId = submission.getJobFlowId();
log.info("Submitted EMR job as job flow id {}", jobFlowId);
DescribeClusterResult result =
waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
diagnoseClusterResult(result, jobFlowId);
} finally {
emr.shutdown();
}
}
private DescribeClusterResult waitForCompletion(
AmazonElasticMapReduceClient emr, String jobFlowId,
long sleepTime, TimeUnit timeUnit)
throws InterruptedException {
String state = "STARTING";
while (true) {
DescribeClusterResult result = emr.describeCluster(
new DescribeClusterRequest().withClusterId(jobFlowId)
);
ClusterStatus status = result.getCluster().getStatus();
String newState = status.getState();
if (!state.equals(newState)) {
log.info("Cluster id {} switched from {} to {}. Reason: {}.",
jobFlowId, state, newState, status.getStateChangeReason());
state = newState;
}
switch (state) {
case "TERMINATED":
case "TERMINATED_WITH_ERRORS":
case "WAITING":
return result;
}
timeUnit.sleep(sleepTime);
}
}
private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
ClusterStatus status = result.getCluster().getStatus();
ClusterStateChangeReason reason = status.getStateChangeReason();
ClusterStateChangeReasonCode code =
ClusterStateChangeReasonCode.fromValue(reason.getCode());
switch (code) {
case ALL_STEPS_COMPLETED:
log.info("Completed EMR job {}", jobFlowId);
break;
default:
failEMR(jobFlowId, status);
}
}
private static void failEMR(String jobFlowId, ClusterStatus status) {
String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s";
throw new RuntimeException(String.format(msg, jobFlowId, status));
}
Nguồn
2014-12-20 00:46:40
Nộp giải pháp của riêng bạn để vấn đề của bạn ngay lập tức được khá hoan nghênh ở đây, tuy nhiên, cách tiếp cận mong muốn là để chia này thành một câu hỏi và một câu trả lời vẫn còn, xem [Đó là OK để hỏi và trả lời câu hỏi riêng của bạn] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - điều này giúp sắp xếp/phân loại mọi thứ một cách thích hợp, tức là tạo chỗ cho những câu hỏi thực sự chưa được trả lời áp dụng, cảm ơn! –
Cảm ơn, tôi sẽ lưu ý nó như là một tài liệu tham khảo trong tương lai. – siditom
Bạn cũng nên bao gồm các trạng thái đã hoàn thành khác. Một số người đọc này có thể lặp lại mãi mãi nếu họ khởi tạo 'jobAttributes' như đã cho. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –