Tôi đang cố gắng viết một công việc Spark xử lý luồng đơn giản sẽ lấy danh sách thư (định dạng JSON), mỗi thư thuộc về người dùng, đếm thông điệp của từng người dùng và in mười người dùng.NotSerializableException khi sắp xếp trong Spark
Tuy nhiên, khi tôi xác định Trình so sánh> để sắp xếp số lượng đã giảm, toàn bộ sự cố không thành công với một cú pháp bị bỏ qua là java.io.NotSerializableException.
phụ thuộc maven của tôi cho Spark:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<version>0.8.0-incubating</version>
Các mã Java Tôi đang sử dụng:
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local", "spark");
JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache();
JavaPairRDD<String, Long> words = lines
.map(new Function<String, JsonElement>() {
// parse line into JSON
@Override
public JsonElement call(String t) throws Exception {
return (new JsonParser()).parse(t);
}
}).map(new Function<JsonElement, String>() {
// read User ID from JSON
@Override
public String call(JsonElement json) throws Exception {
return json.getAsJsonObject().get("userId").toString();
}
}).map(new PairFunction<String, String, Long>() {
// count each line
@Override
public Tuple2<String, Long> call(String arg0) throws Exception {
return new Tuple2(arg0, 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
// count messages for every user
@Override
public Long call(Long arg0, Long arg1) throws Exception {
return arg0 + arg1;
}
});
// sort result in a descending order and take 10 users with highest message count
// This causes the exception
List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return -1 * o1._2().compareTo(o2._2());
}
});
// print result
for (Tuple2<String, Long> tuple : sorted) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
Kết quả là stack trace:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
tôi đã đi qua API Spark tài liệu nhưng không thể tìm thấy bất cứ điều gì mà sẽ chỉ cho tôi hướng đi đúng. Tôi có làm điều gì sai hay đây có phải là lỗi trong Spark không? Bất kỳ trợ giúp nào cũng sẽ được đánh giá cao.
UPDATE: Rõ ràng, tất cả boils xuống đối tượng Comparator đang được chuyển làm đối số thứ hai cho * takeOrdered() *. Khi giao diện Comparator không mở rộng Serializable để làm cho công việc này, bạn cần phải tạo ra một 'serializable' so sánh: 'public interface SerializableComparator mở rộng sánh , Serializable {} ' Sau đó, đi qua một đối tượng mà thực hiện giao diện này khi bộ so sánh ngăn cản ngoại lệ ban đầu. Cấp, điều này có lẽ không phải là giải pháp thanh lịch nhất cho vấn đề này và tôi chắc chắn sẽ chào đón một số gợi ý :) –