2013-10-17 17 views
5

Tôi đang cố gắng viết một công việc Spark xử lý luồng đơn giản sẽ lấy danh sách thư (định dạng JSON), mỗi thư thuộc về người dùng, đếm thông điệp của từng người dùng và in mười người dùng.NotSerializableException khi sắp xếp trong Spark

Tuy nhiên, khi tôi xác định Trình so sánh> để sắp xếp số lượng đã giảm, toàn bộ sự cố không thành công với một cú pháp bị bỏ qua là java.io.NotSerializableException.

phụ thuộc maven của tôi cho Spark:

<groupId>org.apache.spark</groupId> 
<artifactId>spark-core_2.9.3</artifactId> 
<version>0.8.0-incubating</version> 

Các mã Java Tôi đang sử dụng:

public static void main(String[] args) { 

    JavaSparkContext sc = new JavaSparkContext("local", "spark"); 

    JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache(); 

    JavaPairRDD<String, Long> words = lines 
     .map(new Function<String, JsonElement>() { 
      // parse line into JSON 
      @Override 
      public JsonElement call(String t) throws Exception { 
       return (new JsonParser()).parse(t); 
      } 

     }).map(new Function<JsonElement, String>() { 
      // read User ID from JSON 
      @Override 
      public String call(JsonElement json) throws Exception { 
       return json.getAsJsonObject().get("userId").toString(); 
      } 

     }).map(new PairFunction<String, String, Long>() { 
      // count each line 
      @Override 
      public Tuple2<String, Long> call(String arg0) throws Exception { 
       return new Tuple2(arg0, 1L); 
      } 

     }).reduceByKey(new Function2<Long, Long, Long>() { 
      // count messages for every user 
      @Override 
      public Long call(Long arg0, Long arg1) throws Exception { 
       return arg0 + arg1; 
      } 

     }); 

    // sort result in a descending order and take 10 users with highest message count 
    // This causes the exception 
    List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){ 

     @Override 
     public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { 
      return -1 * o1._2().compareTo(o2._2()); 
     } 

    }); 

    // print result 
    for (Tuple2<String, Long> tuple : sorted) { 
     System.out.println(tuple._1() + ": " + tuple._2()); 
    } 

} 

Kết quả là stack trace:

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:601) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 

tôi đã đi qua API Spark tài liệu nhưng không thể tìm thấy bất cứ điều gì mà sẽ chỉ cho tôi hướng đi đúng. Tôi có làm điều gì sai hay đây có phải là lỗi trong Spark không? Bất kỳ trợ giúp nào cũng sẽ được đánh giá cao.

+0

UPDATE: Rõ ràng, tất cả boils xuống đối tượng Comparator đang được chuyển làm đối số thứ hai cho * takeOrdered() *. Khi giao diện Comparator không mở rộng Serializable để làm cho công việc này, bạn cần phải tạo ra một 'serializable' so sánh: 'public interface SerializableComparator mở rộng sánh , Serializable {} ' Sau đó, đi qua một đối tượng mà thực hiện giao diện này khi bộ so sánh ngăn cản ngoại lệ ban đầu. Cấp, điều này có lẽ không phải là giải pháp thanh lịch nhất cho vấn đề này và tôi chắc chắn sẽ chào đón một số gợi ý :) –

Trả lời

2

Như @ vanco.anton ám chỉ, bạn có thể làm một cái gì đó giống như sử dụng sau Java 8 giao diện chức năng:

public interface SerializableComparator<T> extends Comparator<T>, Serializable { 

    static <T> SerializableComparator<T> serialize(SerializableComparator<T> comparator) { 
    return comparator; 
    } 

} 

Và sau đó trong mã của bạn:

import static SerializableComparator.serialize; 
... 
rdd.top(10, serialize((a, b) -> -a._2.compareTo(b._2))); 
Các vấn đề liên quan