2012-05-25 32 views
11

Gần đây tôi đã làm việc với Amazon Web Services (AWS) và tôi nhận thấy không có nhiều tài liệu về chủ đề này, vì vậy tôi đã thêm giải pháp của mình.Làm cách nào để đợi hoàn thành luồng công việc Elastic MapReduce trong ứng dụng Java?

Tôi đã viết một ứng dụng bằng cách sử dụng Amazon Elastic MapReduce (Amazon EMR). Sau khi tính toán kết thúc, tôi cần thực hiện một số công việc trên các tệp do chúng tạo ra, vì vậy tôi cần biết khi nào công việc hoàn thành công việc của nó.

Đây là cách bạn có thể kiểm tra nếu dòng chảy công việc của bạn đã hoàn thành:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

Bạn cũng có thể tìm kiếm một công việc cụ thể id trong DescribeJobFlowsRequest và sau đó để kiểm tra xem công việc đã hoàn thành của thất bại.

Tôi hy vọng nó sẽ giúp người khác.

+5

Nộp giải pháp của riêng bạn để vấn đề của bạn ngay lập tức được khá hoan nghênh ở đây, tuy nhiên, cách tiếp cận mong muốn là để chia này thành một câu hỏi và một câu trả lời vẫn còn, xem [Đó là OK để hỏi và trả lời câu hỏi riêng của bạn] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - điều này giúp sắp xếp/phân loại mọi thứ một cách thích hợp, tức là tạo chỗ cho những câu hỏi thực sự chưa được trả lời áp dụng, cảm ơn! –

+0

Cảm ơn, tôi sẽ lưu ý nó như là một tài liệu tham khảo trong tương lai. – siditom

+0

Bạn cũng nên bao gồm các trạng thái đã hoàn thành khác. Một số người đọc này có thể lặp lại mãi mãi nếu họ khởi tạo 'jobAttributes' như đã cho. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –

Trả lời

1

Khi luồng công việc đã hoàn thành, cụm dừng và phân vùng HDFS bị mất. để tránh mất dữ liệu, định cấu hình bước cuối cùng của luồng công việc để lưu trữ kết quả trong Amazon S3.

Nếu tham số JobFlowInstancesDetail: KeepJobFlowAliveWhenNoSteps được đặt thành TRUE, luồng công việc sẽ chuyển sang trạng thái WAITING thay vì tắt sau khi các bước đã hoàn tất.

Cho phép tối đa 256 bước trong mỗi luồng công việc.

Nếu công việc của bạn tốn thời gian, tôi khuyên bạn nên lưu trữ kết quả định kỳ.

Tóm tắt câu chuyện dài: không có cách nào để biết khi nào nó được thực hiện. Thay vào đó, bạn cần phải lưu dữ liệu của mình như một phần của công việc.

1

Sử dụng tùy chọn --wait-for-steps khi tạo luồng công việc.

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

Tôi cũng gặp sự cố này và đây là giải pháp mà tôi đã đưa ra ngay bây giờ. Nó không hoàn hảo, nhưng hy vọng nó sẽ hữu ích. Để tham khảo, tôi đang sử dụng Java 1.7 và AWS Java SDK phiên bản 1.9.13.

Lưu ý rằng mã này giả định rằng bạn đang đợi cụm chấm dứt, không phải là các bước nói đúng; nếu cụm của bạn chấm dứt khi tất cả các bước của bạn được thực hiện điều này là ổn, nhưng nếu bạn đang sử dụng các cụm mà vẫn còn sống sau khi hoàn thành bước này sẽ không giúp bạn quá nhiều.

Ngoài ra, hãy lưu ý rằng mã này theo dõi và ghi nhật ký các thay đổi trạng thái cụm, và ngoài ra chẩn đoán xem cụm đã chấm dứt lỗi và ném ngoại lệ nếu có.

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
} 
Các vấn đề liên quan