Tôi đang chạy Apache Spark 1.3.1 trên Scala 2.11.2 và khi chạy trên cụm HPC có đủ dữ liệu, tôi nhận được nhiều lỗi như lỗi ở cuối bài đăng của tôi (lặp đi lặp lại nhiều lần mỗi giây, cho đến khi công việc bị giết vì bị theo thời gian). Dựa trên các lỗi, người thực thi đang cố gắng lấy dữ liệu ngẫu nhiên từ các nút khác nhưng không thể làm như vậy. Chương trình này thực hiện tốt với (a) một lượng dữ liệu nhỏ hơn, hoặc (b) ở chế độ chỉ cục bộ, do đó nó có liên quan đến dữ liệu được gửi qua mạng (và không được kích hoạt với một lượng dữ liệu rất nhỏ).Apache Spark: lỗi mạng giữa các người thi hành
Mã đang được thực hiện trong khoảng thời gian này xảy ra là như sau:
val partitioned_data = data // data was read as sc.textFile(inputFile)
.zipWithIndex.map(x => (x._2, x._1))
.partitionBy(partitioner) // A custom partitioner
.map(_._2)
// Force previous lazy operations to be evaluated. Presumably adds some
// overhead, but hopefully the minimum possible...
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html
sc.runJob(partitioned_data, (iter: Iterator[_]) => {})
Đây có phải là dấu hiệu của một lỗi, hoặc là có cái gì tôi đang làm sai?
Dưới đây là một đoạn nhỏ của các bản ghi stderr của một trong những Chấp hành viên (full log là here):
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection
java.io.IOException: Resource temporarily unavailable
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection
java.nio.channels.ClosedChannelException
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection
java.io.IOException: Resource temporarily unavailable
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
at sun.nio.ch.IOUtil.read(IOUtil.java:206)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms
Tôi đang sử dụng phiên bản spark 1.4.1 và chuyển sang nio đã giải quyết được sự cố. – firemonkey
Sử dụng Spark 1.3.1 với HDP 2.3, chúng tôi đã có cùng một vấn đề. Chuyển sang nio đã giải quyết được vấn đề. –
chuyển sang nio không giải quyết được vấn đề trong tia lửa 1.5.1, bất kỳ ý tưởng nào? –