Hai câu hỏi, câu trả lời cho câu hỏi chung sẽ hướng dẫn tôi về mức tối thiểu tôi có thể tạo MVCE như thế nào.Trình tự nối tiếp Kryo gây ra ngoại lệ trên lớp Scala bên dưới WrappedArray
1) Làm thế nào tôi có thể biết để đăng ký WrappedArray lên phía trước, (và mọi lớp khác trong Scala tôi có thể sử dụng)? Việc đăng ký lớp học từ thư viện với Kryo có bình thường không?
và cụ thể:
2) Làm cách nào để khắc phục sự cố này? (Sẵn sàng thừa nhận tôi có thể có cái gì khác screwy xảy ra rằng nếu phản ánh một lỗi sai ở đây, vì vậy đừng giết mình cố gắng để tái sản xuất này)
CHI TIẾT
kiểm tra ra một chương trình Spark trong Java sử dụng của khách hàng của chúng tôi các lớp học liên quan đến di truyền và thống kê, trên Spark 1.4.1, Scala 2.11.5 với các cài đặt sau trên SparkConf:
// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};
SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
<SNIP other settings to declare master>
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//require registration of all classes with Kryo
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(kryoClassArray);
Bắt lỗi này (lặp đi lặp lại ở cuối danh sách lỗi dài):
Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
Nhưng tôi không bao giờ gọi lớp đó từ mã của tôi. Tôi có thể thêm scala.collection.mutable.WrappedArray
vào kryoClassArray nhưng không khắc phục được sự cố. Nếu tôi thêm scala.collection.mutable.WrappedArray $ ofRef.class (như được đề xuất trong lỗi), đó là lỗi cú pháp, tôi thấy tôi không thể khai báo một hàm ẩn danh ở đây?
MVCE: Tôi đã bắt đầu một MVCE nhưng vấn đề là, để làm một với các lớp học của chúng tôi yêu cầu thư viện bên ngoài và các tập tin văn bản/dữ liệu. Một khi tôi loại bỏ các lớp học của chúng tôi, tôi không có vấn đề gì. Nếu ai đó có thể trả lời câu hỏi chung, nó có thể giúp hướng dẫn tôi về số lượng MVCE mà tôi có thể đưa ra.
Khi tôi viết câu hỏi này, tôi đã tiếp tục cập nhật lên 1.5.2, sẽ xem liệu có bất kỳ thay đổi nào và cập nhật câu hỏi nếu có.
ngắn của một MVCE đây là tờ khai lớp học của tôi:
public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {
class PrintHetSharing implements VoidFunction<DropResult> {
class SparkDoDrop implements Function<Integer, Integer> {
lỗi đầy đủ:
16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
1. Biết những gì các lớp học sẽ yêu cầu serialization đòi hỏi bạn kiểm tra mã của bạn và hiểu những gì nó đang làm (bạn chỉ dán mẫu conf, không phải việc sử dụng). 2. Giống như 1, không thể trả lời mà không có mẫu mã. –
Chắc chắn, @DanielL. Tôi sẽ chỉnh sửa trong một số mã. Nhưng có vẻ như bạn đang nói rằng tôi cần biết mọi lớp cơ bản? Như một nguyên tắc chung?Tôi đang viết Java nên tôi không mong đợi cần phải có một nhận thức về các lớp Scala cơ bản để làm cho Kryo hoạt động. Cảm ơn – JimLohse
@DanielL. Tôi đánh giá cao các yêu cầu MVCE, vấn đề tôi chạy vào là, để làm một với các lớp học của chúng tôi yêu cầu các thư viện bên ngoài và các tập tin văn bản/dữ liệu. Một khi tôi loại bỏ các lớp học của chúng tôi và sự cần thiết cho các tập tin của chúng tôi, tôi không có vấn đề. Nếu ai đó có thể trả lời câu hỏi chung, nó có thể giúp hướng dẫn tôi về số lượng MVCE mà tôi có thể đưa ra. Tôi thực hiện Serializable trong tất cả các lớp, hoặc rõ ràng hoặc bằng cách thực hiện các hàm từ Spark như nhập org.apache.spark.api.java.function.Function và org.apache.spark.api.java.function.VoidFunction – JimLohse