Tôi gặp một vấn đề rất lạ về Spark về việc tuần tự hóa. Mã này là như sau:Ngoại lệ tuần tự hóa trên tia lửa
class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
docs
}
}
nơi tài liệu được định nghĩa là:
class Document(val tokens: SparseVector[Int]) extends Serializable
và DocumentParameter là:
class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable
object DocumentParameter extends Serializable
{
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,
Array.ofDim[Float](numOfTopics))
}
SparseVectoris một lớp serializable trong breeze.linalg.SparseVector
.
Đây là một thủ tục bản đồ đơn giản, và tất cả các lớp học là serializable, nhưng tôi nhận được ngoại lệ này:
org.apache.spark.SparkException: Task not serializable
Nhưng khi tôi loại bỏ các tham số numOfTopics
, đó là:
object DocumentParameter extends Serializable
{
def apply(document: Document) = new DocumentParameter(document,
Array.ofDim[Float](10))
}
và gọi nó như thế này:
val docs = documents.map(DocumentParameter.apply)
và có vẻ OK.
Loại Int không thể tuần tự hóa? Nhưng tôi thấy rằng một số mã được viết như thế.
Tôi không chắc chắn cách khắc phục lỗi này.
# CẬP NHẬT #:
Cảm ơn bạn @samthebest. Tôi sẽ bổ sung thêm chi tiết về nó.
stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more
Khi theo dõi ngăn xếp cung cấp thông tin chung về ngoại lệ, tôi đã xóa nó.
Tôi chạy mã trong vỏ tia lửa.
// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)
Bạn có thể cho tôi một số hướng dẫn hoặc mẹo về tuần tự không?
Cảm ơn bạn. Nó hoạt động như bạn đề nghị. Ngoài ra, tôi tìm một cách khác để giải quyết vấn đề này: thêm '@ transient' trước' val sc: SparkContext', sau đó 'SparkContext' sẽ không được đăng. – superhan
Tôi không đồng ý rằng bạn nên tránh lưu trữ 'SparkContext' trong các lớp của bạn hoàn toàn (nhưng vẫn được upvoted). Nếu bạn không lưu trữ chúng trong phạm vi thì bạn có thể kết thúc nhận được thông số bloat (đó là xấu xí ngay cả khi sử dụng params ngầm). Cách thay thế duy nhất là dính vào nó một số singleton toàn cầu ở đâu đó mà tạo ra vấn đề của riêng nó (con trỏ null đáng sợ). – samthebest