2015-07-31 16 views
12

Tôi đang cố gắng gửi một JAR với công việc Spark vào cụm YARN từ mã Java. Tôi đang sử dụng SparkLauncher nộp dụ SparkPi:Trình khởi chạy Spark đang chờ hoàn thành công việc vô hạn

Process spark = new SparkLauncher() 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi") 
    .setMaster("yarn-cluster") 
    .launch(); 
System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

Có hai vấn đề:

  1. Trong khi nộp trong "sợi cụm" chế độ, ứng dụng được thành công trình SỢI và thực hiện thành công (nó là hiển thị trong giao diện người dùng YARN, được báo cáo là SUCCESS và pi được in ở đầu ra). Tuy nhiên, ứng dụng gửi không bao giờ được thông báo rằng quá trình xử lý đã hoàn tất - nó treo vô hạn sau khi in "Đang đợi hoàn tất ..." Nhật ký của vùng chứa có thể được tìm thấy here
  2. Khi gửi ở chế độ "sợi-khách hàng", ứng dụng không xuất hiện trong giao diện người dùng và ứng dụng gửi bị treo ở "Đang đợi hoàn tất ..." Khi mã treo bị giết, ứng dụng hiển thị trong giao diện người dùng và nó được báo cáo là THÀNH CÔNG, nhưng đầu ra trống (pi không được in ngoài). Nhật ký của container có thể được tìm thấy here

Tôi cố gắng để thực hiện các ứng dụng trên Facebook Chia cả với Oracle Java 7 và 8.

Trả lời

14

tôi đã giúp đỡ trong danh sách gửi thư Spark. Điều quan trọng là đọc/xóa getInputStream và getErrorStream() trong tiến trình. Quá trình con có thể lấp đầy bộ đệm và gây ra bế tắc - xem Oracle docs regarding Process. Các dòng cần được đọc đồng chủ đề riêng biệt:

Process spark = new SparkLauncher() 
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6") 
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar") 
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch(); 

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input"); 
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 
inputThread.start(); 

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error"); 
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 
errorThread.start(); 

System.out.println("Waiting for finish..."); 
int exitCode = spark.waitFor(); 
System.out.println("Finished! Exit code:" + exitCode); 

nơi lớp InputStreamReaderRunnable là:

public class InputStreamReaderRunnable implements Runnable { 

    private BufferedReader reader; 

    private String name; 

    public InputStreamReaderRunnable(InputStream is, String name) { 
     this.reader = new BufferedReader(new InputStreamReader(is)); 
     this.name = name; 
    } 

    public void run() { 
     System.out.println("InputStream " + name + ":"); 
     try { 
      String line = reader.readLine(); 
      while (line != null) { 
       System.out.println(line); 
       line = reader.readLine(); 
      } 
      reader.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

Trong trường hợp của tôi, tôi gặp phải sự cố về classpath, vì vậy tia lửa đã thoát ngay lập tức. Vì vậy, nếu có vẻ như một người nào khác mà nó chỉ đơn giản là không gọi đến ứng dụng tia lửa của bạn, câu trả lời này hoạt động như là tốt. – jmmut

7

Vì đây là một bài cũ, tôi muốn thêm một bản cập nhật có thể giúp ai từng đọc bài này sau. Trong spark 1.6.0 có một số chức năng được thêm vào trong lớp SparkLauncher. Đó là:

def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle 

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

Bạn có thể chạy các ứng dụng với ra sự cần thiết của đề bổ sung cho stdout và stderr xử lý sang trọng có một báo cáo tình trạng tốt đẹp của các ứng dụng đang chạy. Sử dụng mã này:

val env = Map(
     "HADOOP_CONF_DIR" -> hadoopConfDir, 
     "YARN_CONF_DIR" -> yarnConfDir 
    ) 
    val handler = new SparkLauncher(env.asJava) 
     .setSparkHome(sparkHome) 
     .setAppResource("Jar/location/.jar") 
     .setMainClass("path.to.the.main.class") 
     .setMaster("yarn-client") 
     .setConf("spark.app.id", "AppID if you have one") 
     .setConf("spark.driver.memory", "8g") 
     .setConf("spark.akka.frameSize", "200") 
     .setConf("spark.executor.memory", "2g") 
     .setConf("spark.executor.instances", "32") 
     .setConf("spark.executor.cores", "32") 
     .setConf("spark.default.parallelism", "100") 
     .setConf("spark.driver.allowMultipleContexts","true") 
     .setVerbose(true) 
     .startApplication() 
println(handle.getAppId) 
println(handle.getState) 

Bạn có thể giữ enquering nhà nước nếu ứng dụng tia lửa cho đến khi nó cung cấp cho thành công. Để biết thông tin về cách máy chủ Spark Launcher hoạt động trong 1.6.0. xem liên kết này: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

+3

Tôi muốn nhấn mạnh điều này chỉ hoạt động ở chế độ khách hàng. – msemelman

+0

@msemelman Cảm ơn bạn rất nhiều vì đã làm rõ điều này, đã bị mắc kẹt về vấn đề này. Làm sao bạn biết được điều này? –

+0

Hoạt động ở chế độ cụm. Tôi đang sử dụng Spark-1.6.1 – Tariq

3

Tôi đã triển khai sử dụng CountDownLatch và hoạt động như mong đợi. Đây là phiên bản SparkLauncher 2.0.1 và nó cũng hoạt động ở chế độ Cụm sợi.

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
+0

Điều này thực sự là một bình luận, không phải là một câu trả lời. Khi bạn đạt đến 50 [danh tiếng] (// stackoverflow.com/help/whats-reputation), bạn sẽ có thể [nhận xét về tất cả các bài đăng] (// stackoverflow.com/privileges/comment). – dorukayhan

Các vấn đề liên quan