2016-03-07 17 views
7

Chúng tôi đang sử dụng Spark Streaming 1.6.0 chạy trên AWS EMR 4.3.x, tiêu thụ dữ liệu từ luồng Kinesis. Được sử dụng để hoạt động bình thường trong Spark 1.3.1 Sau khi di chuyển, chúng tôi không thể chịu được tải lâu. Ganglia cho thấy bộ nhớ được sử dụng của cụm vẫn tiếp tục phát triển cho đến khi đạt tới một số giới hạn mà không cần GC. Sau đó có một số lô thực sự dài (về mặt hàng chục phút thay vì vài giây). Và sau đó Spark bắt đầu giết và trả lại những người thi hành (được thực hiện hết lần này đến lần khác),Phát trực tuyến 1.6.0 - Các công nhân nhảy lên

Về cơ bản cụm không sử dụng được. Sự cố này có thể lặp lại theo thời gian tải. Điều gì có thể là nguyên nhân khiến Spark không thể GC mà không giết chết người thực thi? Làm thế nào chúng ta có thể làm cho cụm chạy trong nhiều tuần (hiện tại không thể chạy theo giờ)

Bất kỳ đầu vào nào đều được chào đón.

Chúng tôi đang sử dụng định nghĩa sau đây khi xác định một công việc:

sparkConf.set("spark.shuffle.consolidateFiles", "true"); 
sparkConf.set("spark.storage.memoryFraction", "0.5"); 
sparkConf.set("spark.streaming.backpressure.enabled", "true"); 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

Làm một liên minh của

KinesisUtils.createStream(streamingContext, appName, 
         kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval, 
         StorageLevel.MEMORY_AND_DISK_2()); 

Tôi đã lột ứng dụng của chúng để một bộ xương trần cho các bài kiểm tra. Giữ bản đồ từ luồng byte đến luồng Chuỗi, sau đó chuyển đổi thành đối tượng, lọc ra các sự kiện không liên quan, sau đó lưu giữ và lưu trữ vào S3.

eventStream = eventStream.persist (StorageLevel.MEMORY_AND_DISK_SER_2());

eventStream = eventStream.repartition (configuration.getSparkOutputPartitions()); eventStream.foreachRDD (new RddByPartitionSaverFunction <> (new OutputToS3Function()));

việc Spark được nộp cùng với cấu hình sau (sao chép với thay đổi kích thước bộ nhớ từ cấu hình Spark mặc định):

spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M 
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M 

ngoại lệ Thêm. 1-st cụm

16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
     at scala.util.Try$.apply(Try.scala:161) 
     at scala.util.Failure.recover(Try.scala:185) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
     at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.complete(Promise.scala:55) 
     at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) 
     at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
     ... 7 more 
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
1 
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

nỗ lực cụm thứ hai:

16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 

Cảm ơn trước ...

+0

Bạn có tìm thấy giải pháp hoặc vấn đề không? –

+0

Xin chào. Không, chúng tôi vẫn chưa tìm được giải pháp. Sự cố chỉ có thể tái sản xuất với số lượng lớn các mảnh Kinesis (120). BẬT sau đó các nhà điều hành bắt đầu nảy. Ý tưởng? Cảm ơn – visitor

+0

Phản hồi của người thi hành đã dừng trong Spark 2.0.0 (EMR 5.0.0). Có một vấn đề mới ngăn chặn một thời gian dài của cùng một ứng dụng: http: // stackoverflow.com/questions/39289345/spark-streaming-2-0-0-freezes-after-vài ngày-under-load – visitor

Trả lời

-2

tôi làm được điều gì đó trong bối cảnh đó .. Khi một lô vi mất hơn 120 giây để hoàn thành, nó được kích hoạt:

16/03/14 22:57:30 INFO SparkStreaming$: Batch size: 2500, total read: 4287800 
16/03/14 22:57:35 INFO SparkStreaming$: Batch size: 2500, total read: 4290300 
16/03/14 22:57:42 INFO SparkStreaming$: Batch size: 2500, total read: 4292800 
16/03/14 22:57:45 INFO SparkStreaming$: Batch size: 2500, total read: 4295300 
16/03/14 22:59:45 ERROR ContextCleaner: Error cleaning broadcast 11251 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

Tôi đang chạy ở chế độ cục bộ và tiêu thụ f rom Kinesis. Tôi cũng không sử dụng bất kỳ biến phát sóng nào.

Các vấn đề liên quan