Chúng tôi đang sử dụng Spark Streaming 1.6.0 chạy trên AWS EMR 4.3.x, tiêu thụ dữ liệu từ luồng Kinesis. Được sử dụng để hoạt động bình thường trong Spark 1.3.1 Sau khi di chuyển, chúng tôi không thể chịu được tải lâu. Ganglia cho thấy bộ nhớ được sử dụng của cụm vẫn tiếp tục phát triển cho đến khi đạt tới một số giới hạn mà không cần GC. Sau đó có một số lô thực sự dài (về mặt hàng chục phút thay vì vài giây). Và sau đó Spark bắt đầu giết và trả lại những người thi hành (được thực hiện hết lần này đến lần khác),Phát trực tuyến 1.6.0 - Các công nhân nhảy lên
Về cơ bản cụm không sử dụng được. Sự cố này có thể lặp lại theo thời gian tải. Điều gì có thể là nguyên nhân khiến Spark không thể GC mà không giết chết người thực thi? Làm thế nào chúng ta có thể làm cho cụm chạy trong nhiều tuần (hiện tại không thể chạy theo giờ)
Bất kỳ đầu vào nào đều được chào đón.
Chúng tôi đang sử dụng định nghĩa sau đây khi xác định một công việc:
sparkConf.set("spark.shuffle.consolidateFiles", "true");
sparkConf.set("spark.storage.memoryFraction", "0.5");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
Làm một liên minh của
KinesisUtils.createStream(streamingContext, appName,
kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval,
StorageLevel.MEMORY_AND_DISK_2());
Tôi đã lột ứng dụng của chúng để một bộ xương trần cho các bài kiểm tra. Giữ bản đồ từ luồng byte đến luồng Chuỗi, sau đó chuyển đổi thành đối tượng, lọc ra các sự kiện không liên quan, sau đó lưu giữ và lưu trữ vào S3.
eventStream = eventStream.persist (StorageLevel.MEMORY_AND_DISK_SER_2());
eventStream = eventStream.repartition (configuration.getSparkOutputPartitions()); eventStream.foreachRDD (new RddByPartitionSaverFunction <> (new OutputToS3Function()));
việc Spark được nộp cùng với cấu hình sau (sao chép với thay đổi kích thước bộ nhớ từ cấu hình Spark mặc định):
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M
ngoại lệ Thêm. 1-st cụm
16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
... 7 more
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 12 more
16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
1
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
nỗ lực cụm thứ hai:
16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Cảm ơn trước ...
Bạn có tìm thấy giải pháp hoặc vấn đề không? –
Xin chào. Không, chúng tôi vẫn chưa tìm được giải pháp. Sự cố chỉ có thể tái sản xuất với số lượng lớn các mảnh Kinesis (120). BẬT sau đó các nhà điều hành bắt đầu nảy. Ý tưởng? Cảm ơn – visitor
Phản hồi của người thi hành đã dừng trong Spark 2.0.0 (EMR 5.0.0). Có một vấn đề mới ngăn chặn một thời gian dài của cùng một ứng dụng: http: // stackoverflow.com/questions/39289345/spark-streaming-2-0-0-freezes-after-vài ngày-under-load – visitor