2016-01-12 34 views
5

Hai câu hỏi, câu trả lời cho câu hỏi chung sẽ hướng dẫn tôi về mức tối thiểu tôi có thể tạo MVCE như thế nào.Trình tự nối tiếp Kryo gây ra ngoại lệ trên lớp Scala bên dưới WrappedArray

1) Làm thế nào tôi có thể biết để đăng ký WrappedArray lên phía trước, (và mọi lớp khác trong Scala tôi có thể sử dụng)? Việc đăng ký lớp học từ thư viện với Kryo có bình thường không?

và cụ thể:

2) Làm cách nào để khắc phục sự cố này? (Sẵn sàng thừa nhận tôi có thể có cái gì khác screwy xảy ra rằng nếu phản ánh một lỗi sai ở đây, vì vậy đừng giết mình cố gắng để tái sản xuất này)

CHI TIẾT

kiểm tra ra một chương trình Spark trong Java sử dụng của khách hàng của chúng tôi các lớp học liên quan đến di truyền và thống kê, trên Spark 1.4.1, Scala 2.11.5 với các cài đặt sau trên SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized 
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class}; 

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData") 
       <SNIP other settings to declare master> 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       //require registration of all classes with Kryo 
       .set("spark.kryo.registrationRequired", "true") 
       .registerKryoClasses(kryoClassArray); 

Bắt lỗi này (lặp đi lặp lại ở cuối danh sách lỗi dài):

Caused by: java.lang.IllegalArgumentException: Class is not 
registered: scala.collection.mutable.WrappedArray$ofRef Note: To 
register this class use: 
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 

Nhưng tôi không bao giờ gọi lớp đó từ mã của tôi. Tôi có thể thêm scala.collection.mutable.WrappedArray vào kryoClassArray nhưng không khắc phục được sự cố. Nếu tôi thêm scala.collection.mutable.WrappedArray $ ofRef.class (như được đề xuất trong lỗi), đó là lỗi cú pháp, tôi thấy tôi không thể khai báo một hàm ẩn danh ở đây?

MVCE: Tôi đã bắt đầu một MVCE nhưng vấn đề là, để làm một với các lớp học của chúng tôi yêu cầu thư viện bên ngoài và các tập tin văn bản/dữ liệu. Một khi tôi loại bỏ các lớp học của chúng tôi, tôi không có vấn đề gì. Nếu ai đó có thể trả lời câu hỏi chung, nó có thể giúp hướng dẫn tôi về số lượng MVCE mà tôi có thể đưa ra.

Khi tôi viết câu hỏi này, tôi đã tiếp tục cập nhật lên 1.5.2, sẽ xem liệu có bất kỳ thay đổi nào và cập nhật câu hỏi nếu có.

ngắn của một MVCE đây là tờ khai lớp học của tôi:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable { 

class PrintHetSharing implements VoidFunction<DropResult> { 

class SparkDoDrop implements Function<Integer, Integer> { 

lỗi đầy đủ:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0 
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. 
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) 
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) 
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
+0

1. Biết những gì các lớp học sẽ yêu cầu serialization đòi hỏi bạn kiểm tra mã của bạn và hiểu những gì nó đang làm (bạn chỉ dán mẫu conf, không phải việc sử dụng). 2. Giống như 1, không thể trả lời mà không có mẫu mã. –

+0

Chắc chắn, @DanielL. Tôi sẽ chỉnh sửa trong một số mã. Nhưng có vẻ như bạn đang nói rằng tôi cần biết mọi lớp cơ bản? Như một nguyên tắc chung?Tôi đang viết Java nên tôi không mong đợi cần phải có một nhận thức về các lớp Scala cơ bản để làm cho Kryo hoạt động. Cảm ơn – JimLohse

+0

@DanielL. Tôi đánh giá cao các yêu cầu MVCE, vấn đề tôi chạy vào là, để làm một với các lớp học của chúng tôi yêu cầu các thư viện bên ngoài và các tập tin văn bản/dữ liệu. Một khi tôi loại bỏ các lớp học của chúng tôi và sự cần thiết cho các tập tin của chúng tôi, tôi không có vấn đề. Nếu ai đó có thể trả lời câu hỏi chung, nó có thể giúp hướng dẫn tôi về số lượng MVCE mà tôi có thể đưa ra. Tôi thực hiện Serializable trong tất cả các lớp, hoặc rõ ràng hoặc bằng cách thực hiện các hàm từ Spark như nhập org.apache.spark.api.java.function.Function và org.apache.spark.api.java.function.VoidFunction – JimLohse

Trả lời

6

Trong Scala bạn nên khắc phục vấn đề này thêm 'scala.collection.mutable.WrappedArray.ofRef [_]' như lớp đăng ký như trong đoạn mã sau:

conf.registerKryoClasses(
    Array(
    ... 
    classOf[Person], 
    classOf[Array[Person]], 
    ... 
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]] 
) 
) 
+0

Đầu tiên câu trả lời của bạn trông giống như Scala nhưng tôi trong Java nhưng tôi nhận được điểm :) Tôi đánh giá cao câu trả lời nhưng câu hỏi cơ bản là chưa được trả lời, tại sao tôi phải khai báo lớp này khi tôi don không sử dụng nó? Tôi không phải khai báo mọi lớp trong Spark, tại sao lại là cái này? Tôi đã không cố gắng sử dụng Kryo một chút, tôi nên thực hiện lại nó ngay bây giờ rằng giải pháp của chúng tôi là xa hơn nhiều so với con đường và Spark là một vài phiên bản mới hơn. 1 mặc dù, cảm ơn! – JimLohse

+0

không chắc chắn lý do tại sao thẻ java không có trong câu hỏi, xấu, xin lỗi, nó nằm trong câu hỏi chứ không phải thẻ, oops – JimLohse

+2

chấp nhận câu trả lời này ngay bây giờ khi tôi xem lại câu hỏi này, nó không cung cấp câu trả lời hoàn chỉnh mã scala này không hoạt động trong Java. Tất cả cùng nó gần hơn với một câu trả lời cụ thể. Tôi có thể thề trên một câu hỏi khác mà một người nào đó đã đăng cách thêm lớp Scala này vào một mảng Java, sử dụng ví dụ .ofRef [] hoặc $ ofRef Java-esque đều không hoạt động. Bây giờ tôi đã thư giãn cài đặt "bắt buộc" trên Kryo. – JimLohse

2

Bạn không cần phải thực hiện tất cả mọi thứ serializable, độc lập với nó là một phần của thư viện khách hàng hoặc không phải. Nhưng bạn cần phải thực hiện bất kỳ lambda sẽ có hiệu lực trên các nhà thực thi serializable. Những người không chạy trên nút chính, vì vậy không có cách nào để ngăn chặn tuần tự hóa (cũng không làm bạn muốn, vì toàn bộ mục đích của Spark là tính toán phân tán).

Để biết các ví dụ và như vậy (và nếu bạn chưa hiểu rõ khái niệm), hãy kiểm tra the official docs about this.

+0

Cảm ơn, bằng cách xóa câu hỏi chung cho phép tôi biết nơi tập trung nỗ lực của mình, rất hữu ích! Tôi vẫn còn một chút hoang mang lý do tại sao lớp scala WrappedArray đang được báo cáo là một lớp không thể được tuần tự hóa. Tôi sẽ loại bỏ mã của tôi và đặt nó lại với nhau. Tôi hiểu các chức năng ẩn danh và sử dụng chúng khi sử dụng các lớp dựng sẵn - khi tôi đang sử dụng các lớp của chúng tôi, tôi khai báo chúng một cách riêng biệt. Tôi sẽ vẫn làm việc trên một MVCE cảm ơn một lần nữa – JimLohse

Các vấn đề liên quan