2015-03-27 20 views
6

Đây là một ví dụ mã làm việc:org.apache.spark.SparkException: Nhiệm vụ không serializable

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap); 
messages.print(); 
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
}); 

tôi nhận được báo lỗi dưới đây:

ERROR: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) 
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140) 
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46) 
+1

Vâng, nếu nó làm việc thật tuyệt :). Nếu không, bạn có thể bật gỡ lỗi tuần tự hóa Java bằng '-Dsun.io.serialization.extendedDebugInfo = true'. –

+1

Cảm ơn bạn, không ổn, tôi đã thử. JavaDStream lines = messages.map (Chức năng mới , String>() {@Override public String gọi (Tuple2 tuple2) { trở tuple2._2(); } }); Dòng sự cố mã này. –

+0

Chắc chắn mã này là Java chứ không phải Scala (nghĩa là thẻ) – SparkleGoat

Trả lời

14

Vì bạn đang xác định chức năng bản đồ của bạn sử dụng một lớp bên trong vô danh, lớp có chứa cũng phải được Serializable. Xác định chức năng bản đồ của bạn như là một lớp riêng biệt hoặc làm cho nó trở thành một lớp bên trong tĩnh. Từ các tài liệu Java (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

+0

Cảm ơn bạn rất nhiều! –

+0

Rất vui được giúp! Vui lòng chấp nhận câu trả lời nếu nó đã làm – InPursuit

+0

Spark đang cố gắng sắp xếp lại đối tượng được chuyển tới bản đồ nhưng không thể tuần tự hóa đối tượng đó vì không triển khai Serializable? Tại sao Spark đang làm serialization? Và nếu chúng ta định nghĩa hàm map như là một lớp riêng biệt, chúng ta có cần phải làm cho nó có thể Serializable không? – Johan

2

chỉ cung cấp các mẫu mã:

JavaDStream<String> lines = messages.map(mapFunc); 

khai báo lớp bên trong như là một biến tĩnh:

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
} 
Các vấn đề liên quan