2013-05-30 35 views
11

Tôi đang phát với Spark. Nó là mặc định, được xây dựng trước phân phối (0.7.0) từ trang web, với cấu hình mặc định, chế độ cluster, một công nhân (localhost của tôi). Tôi đọc các tài liệu về cài đặt và mọi thứ có vẻ ổn.Cụm tia lửa không thành công trên đầu vào lớn hơn, hoạt động tốt cho nhỏ

Tôi có tệp CSV (các kích thước khác nhau, 1000 - 1million hàng). Nếu tôi chạy ứng dụng của mình với tệp đầu vào nhỏ (ví dụ 1000 hàng), mọi thứ đều ổn, chương trình được thực hiện trong vài giây và tạo ra kết quả mong đợi. Nhưng khi tôi cung cấp một tệp lớn hơn (100.000 hàng hoặc 1 triệu), việc thực hiện không thành công. Tôi đã cố gắng để khai thác trong các bản ghi, nhưng không giúp đỡ nhiều (nó lặp đi lặp lại toàn bộ quá trình khoảng 9-10 lần và exitst với thất bại sau đó. Ngoài ra, có một số lỗi liên quan đến lấy từ một số nguồn null không thành công).

Kết quả Iterable trả về bởi JavaRDD đầu tiên là đáng ngờ đối với tôi. Nếu tôi trả về một danh sách singleton cứng (như res.add ("something"); return res;), mọi thứ đều ổn, ngay cả với một triệu hàng. Nhưng nếu tôi thêm tất cả các phím của tôi tôi muốn (28 chuỗi của chiều dài 6-20 ký tự), quá trình không thành công chỉ với đầu vào lớn. Vấn đề là, tôi cần tất cả các khóa này, đây là logic kinh doanh thực tế.

Tôi đang sử dụng Linux amd64, lõi tứ, ram 8GB. Mới nhất Oracle Java7 JDK. Spark cấu hình:

SPARK_WORKER_MEMORY=4g 
SPARK_MEM=3g 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar 

tôi phải đề cập rằng khi tôi khởi động chương trình, nó nói:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1) 
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 

Đây là chương trình của tôi. Nó dựa trên ví dụ JavaWordCount, được sửa đổi tối thiểu.

public final class JavaWordCount 
{ 
    public static void main(final String[] args) throws Exception 
    { 
     final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", 
      System.getenv("SPARK_HOME"), new String[] {"....jar" }); 

     final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() { 

      @Override 
      public Iterable<String> call(final String s) 
      { 
       // parsing "s" as the line, computation, building res (it's a List<String>) 
       return res; 
      } 
     }); 

     final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { 

      @Override 
      public Tuple2<String, Integer> call(final String s) 
      { 
       return new Tuple2<String, Integer>(s, 1); 
      } 
     }); 
     final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { 

      @Override 
      public Integer call(final Integer i1, final Integer i2) 
      { 
       return i1 + i2; 
      } 
     }); 

     counts.collect(); 

     for (Tuple2<?, ?> tuple : counts.collect()) { 
      System.out.println(tuple._1 + ": " + tuple._2); 
     } 
    } 
} 
+0

Trước khi thay đổi thuộc tính hệ thống Spark, công việc của bạn bị lỗi/ngoại lệ nào? –

+0

Trên nhóm người sử dụng tia lửa tôi nhận được câu trả lời rằng .collect() sẽ kích hoạt bộ sưu tập của mỗi và mọi (tạm thời) RDD. Đó là vấn đề thực sự. Chủ đề với giải pháp ở đây: http://stackoverflow.com/questions/16832429/spark-cluster-fails-on-bigger-input-works-well-for-small?noredirect=1#comment24468201_16832429 – gyorgyabraham

+1

Tôi googled cho các lứa tuổi và độ tuổi cố gắng để tìm giải pháp cho vấn đề của tôi, câu trả lời cho câu hỏi này giải quyết được vấn đề của tôi, vì vậy, vui lòng chỉnh sửa câu hỏi của bạn để bao gồm "org.apache.spark.SparkException: Lỗi liên lạc với MapOutputTracker" trong câu hỏi của bạn để giúp mọi người dễ dàng hơn trong tương lai . – samthebest

Trả lời

13

Tôi cố gắng sửa chữa nó bằng cách thiết lập thuộc tính spark.mesos.coarse là true. Thông tin thêm here.

Cập nhật: Tôi đã chơi với Spark trong một vài giờ. Các cài đặt này đã giúp tôi một chút, nhưng có vẻ như gần như không thể xử lý ~ 10 triệu dòng văn bản trên một máy.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster 
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects 
System.setProperty("spark.mesos.coarse", "true"); // link provided 
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages 
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load 

Note: Tăng kích thước khung hình dường như đặc biệt hữu ích trong việc ngăn ngừa: org.apache.spark.SparkException: Error communicating with MapOutputTracker

+1

'spark.akka.frameSize' cũng đã giải quyết' org.apache.spark.SparkException: Lỗi của tôi khi kết nối với vấn đề MapOutputTracker'. – samthebest

+0

Hệ thống không.setProperty() cũng hoạt động ở chế độ spark-shell? Tôi không thể lấy bộ khung hình –

Các vấn đề liên quan