2015-04-21 18 views
18

Tôi đang chạy Apache Spark 1.3.1 trên Scala 2.11.2 và khi chạy trên cụm HPC có đủ dữ liệu, tôi nhận được nhiều lỗi như lỗi ở cuối bài đăng của tôi (lặp đi lặp lại nhiều lần mỗi giây, cho đến khi công việc bị giết vì bị theo thời gian). Dựa trên các lỗi, người thực thi đang cố gắng lấy dữ liệu ngẫu nhiên từ các nút khác nhưng không thể làm như vậy. Chương trình này thực hiện tốt với (a) một lượng dữ liệu nhỏ hơn, hoặc (b) ở chế độ chỉ cục bộ, do đó nó có liên quan đến dữ liệu được gửi qua mạng (và không được kích hoạt với một lượng dữ liệu rất nhỏ).Apache Spark: lỗi mạng giữa các người thi hành

Mã đang được thực hiện trong khoảng thời gian này xảy ra là như sau:

val partitioned_data = data // data was read as sc.textFile(inputFile) 
    .zipWithIndex.map(x => (x._2, x._1)) 
    .partitionBy(partitioner) // A custom partitioner 
    .map(_._2) 

// Force previous lazy operations to be evaluated. Presumably adds some 
// overhead, but hopefully the minimum possible... 
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html 
sc.runJob(partitioned_data, (iter: Iterator[_]) => {}) 

Đây có phải là dấu hiệu của một lỗi, hoặc là có cái gì tôi đang làm sai?

Dưới đây là một đoạn nhỏ của các bản ghi stderr của một trong những Chấp hành viên (full log là here):

15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection 
java.io.IOException: Resource temporarily unavailable 
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) 
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) 
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) 
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) 
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) 
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) 
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) 
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) 
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) 
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) 
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection 
java.nio.channels.ClosedChannelException 
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection 
java.io.IOException: Resource temporarily unavailable 
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) 
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) 
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) 
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) 
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) 
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) 
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) 
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) 
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) 
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) 
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089 
java.io.IOException: Connection reset by peer 
    at sun.nio.ch.FileDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:206) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236) 
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed 
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms 

Trả lời

15

này dường như là một lỗi liên quan đến hệ thống Netty mạng (dịch vụ chuyển khối), thêm trong Spark 1.2. Thêm .set("spark.shuffle.blockTransferService", "nio") vào SparkConf của tôi đã sửa lỗi, vì vậy, bây giờ mọi thứ hoạt động hoàn hảo.

Tôi đã tìm thấy a post on the spark-user mailing list từ người nào đó đang gặp phải lỗi tương tự và họ đã đề xuất thử nio thay vì Netty.

SPARK-5085 tương tự, trong đó thay đổi từ Netty thành nio khắc phục sự cố của họ; tuy nhiên, họ cũng có thể khắc phục vấn đề bằng cách thay đổi một số cài đặt mạng. (Tôi đã không thử điều này, vì tôi không chắc mình có quyền truy cập phù hợp để làm như vậy trên cụm.)

+1

Tôi đang sử dụng phiên bản spark 1.4.1 và chuyển sang nio đã giải quyết được sự cố. – firemonkey

+0

Sử dụng Spark 1.3.1 với HDP 2.3, chúng tôi đã có cùng một vấn đề. Chuyển sang nio đã giải quyết được vấn đề. –

+8

chuyển sang nio không giải quyết được vấn đề trong tia lửa 1.5.1, bất kỳ ý tưởng nào? –

0

Cũng có thể cấu hình Maven khác với cài đặt máy chủ Spark của bạn.

Ví dụ chọn của bạn một pom.xml từ một bài đăng blog Tutorial

<dependencies> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_1.3</artifactId> 
     <version>1.3</version> 
    </dependency> 

</dependencies> 

Nhưng bạn có thể tải về phiên bản mới nhất 2.3 trên trang web Spark Apache.

Các vấn đề liên quan