2014-12-20 20 views
7

Tôi gặp một vấn đề rất lạ về Spark về việc tuần tự hóa. Mã này là như sau:Ngoại lệ tuần tự hóa trên tia lửa

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable 
{ 
    def infer(document: RDD[Document]): RDD[DocumentParameter] = { 
     val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
     docs 
    } 
} 

nơi tài liệu được định nghĩa là:

class Document(val tokens: SparseVector[Int]) extends Serializable 

và DocumentParameter là:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable 

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics)) 
} 

SparseVectoris một lớp serializable trong breeze.linalg.SparseVector.

Đây là một thủ tục bản đồ đơn giản, và tất cả các lớp học là serializable, nhưng tôi nhận được ngoại lệ này:

org.apache.spark.SparkException: Task not serializable 

Nhưng khi tôi loại bỏ các tham số numOfTopics, đó là:

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10)) 
} 

và gọi nó như thế này:

val docs = documents.map(DocumentParameter.apply) 

và có vẻ OK.

Loại Int không thể tuần tự hóa? Nhưng tôi thấy rằng một số mã được viết như thế.

Tôi không chắc chắn cách khắc phục lỗi này.

# CẬP NHẬT #:

Cảm ơn bạn @samthebest. Tôi sẽ bổ sung thêm chi tiết về nó.

stack trace: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
    at com.topicmodel.PLSA.infer(PLSA.scala:13) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) 
    at $iwC$$iwC$$iwC.<init>(<console>:39) 
    at $iwC$$iwC.<init>(<console>:41) 
    at $iwC.<init>(<console>:43) 
    at <init>(<console>:45) 
    at .<init>(<console>:49) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 46 more 

Khi theo dõi ngăn xếp cung cấp thông tin chung về ngoại lệ, tôi đã xóa nó.

Tôi chạy mã trong vỏ tia lửa.

// suppose I have get RDD[Document] for docs 
val numOfTopics = 100 
val plsa = new PLSA(sc, numOfTopics) 
val docPara = plsa.infer(docs) 

Bạn có thể cho tôi một số hướng dẫn hoặc mẹo về tuần tự không?

Trả lời

10

Chức năng ẩn danh tuần tự hóa lớp chứa của chúng. Khi bạn map {doc => DocumentParameter(doc, numOfTopics)}, cách duy nhất để có thể cấp quyền truy cập đó cho numOfTopics là để tuần tự hóa lớp PLSA. Và lớp đó không thể được tuần tự hóa, bởi vì (như bạn có thể thấy từ stacktrace) nó chứa SparkContext mà không phải là serializable (Những điều xấu sẽ xảy ra nếu các nút cluster riêng lẻ có quyền truy cập vào ngữ cảnh và có thể tạo các công việc mới từ trong một người lập bản đồ).

Nói chung, hãy cố gắng tránh lưu trữ SparkContext trong lớp học của bạn (chỉnh sửa: hoặc ít nhất, hãy đảm bảo rõ ràng loại lớp nào chứa SparkContext và loại nào không); tốt hơn là chuyển nó thành tham số (có thể là implicit) cho các phương thức riêng lẻ cần. Ngoài ra, hãy di chuyển hàm {doc => DocumentParameter(doc, numOfTopics)} vào một lớp khác từ PLSA, một lớp thực sự có thể được tuần tự hóa.

(Như nhiều người đã đề xuất, có thể giữ SparkContext trong lớp nhưng được đánh dấu là @transient để nó không được sắp xếp theo thứ tự. Tôi không khuyến nghị phương pháp này, có nghĩa là lớp học sẽ "thay đổi" một cách kỳ diệu trạng thái khi được tuần tự hóa (mất SparkContext), và vì vậy bạn có thể kết thúc với NPE khi bạn cố gắng truy cập vào SparkContext từ bên trong một công việc được tuần tự hóa. và có thể sử dụng SparkContext) và các lớp được sắp xếp để chạy trên cụm (không được có SparkContext)).

+2

Cảm ơn bạn. Nó hoạt động như bạn đề nghị. Ngoài ra, tôi tìm một cách khác để giải quyết vấn đề này: thêm '@ transient' trước' val sc: SparkContext', sau đó 'SparkContext' sẽ không được đăng. – superhan

+0

Tôi không đồng ý rằng bạn nên tránh lưu trữ 'SparkContext' trong các lớp của bạn hoàn toàn (nhưng vẫn được upvoted). Nếu bạn không lưu trữ chúng trong phạm vi thì bạn có thể kết thúc nhận được thông số bloat (đó là xấu xí ngay cả khi sử dụng params ngầm). Cách thay thế duy nhất là dính vào nó một số singleton toàn cầu ở đâu đó mà tạo ra vấn đề của riêng nó (con trỏ null đáng sợ). – samthebest

0

Đây thực sự là một điều kỳ lạ, nhưng tôi nghĩ tôi có thể đoán được vấn đề. Nhưng trước tiên, bạn đã không cung cấp số tiền tối thiểu để giải quyết vấn đề (tôi có thể đoán, bởi vì tôi đã thấy 100 trong số này trước đây). Dưới đây là một số vấn đề với câu hỏi của bạn:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = { 
    val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
} 

Phương pháp này không trả lại RDD[DocumentParameter] nó trả Unit. Bạn phải sao chép và dán mã không chính xác.

Thứ hai, bạn chưa cung cấp toàn bộ dấu vết ngăn xếp? Tại sao? Không có lý do gì để KHÔNG cung cấp dấu vết ngăn xếp đầy đủ, và dấu vết ngăn xếp đầy đủ với thông báo là cần thiết để hiểu được lỗi - người ta cần toàn bộ lỗi để hiểu lỗi là gì. Thông thường, một ngoại lệ không thể tuần tự cho bạn biết những gì không thể tuần tự hóa được.

Thứ ba bạn chưa cho chúng tôi biết phương thức infer là bạn đang làm điều này trong trình bao? Đối tượng chứa/lớp/đặc điểm, v.v. của infer là gì?

Dù sao, tôi sẽ đoán rằng bằng cách chuyển vào số Int của bạn gây ra một chuỗi các thứ để được sắp xếp mà bạn không mong đợi, tôi không thể cung cấp thêm thông tin cho đến khi bạn cung cấp tối thiểu để chúng tôi có thể hiểu rõ vấn đề của bạn.

Các vấn đề liên quan