14

Tôi đang cố gắng sử dụng tia lửa cho một số tác vụ học máy đơn giản. Tôi đã sử dụng pyspark và spark 1.2.0 để thực hiện một vấn đề hồi quy logistic đơn giản. Tôi có 1,2 triệu hồ sơ đào tạo và tôi đã băm các tính năng của bản ghi. Khi tôi đặt số tính năng băm như 1024, chương trình hoạt động tốt, nhưng khi tôi đặt số tính năng băm như 16.384, chương trình thất bại nhiều lần với các lỗi sau:Lỗi Java Java: Kích thước vượt quá Số nguyên.MAX_VALUE

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) 
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) 
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Lỗi này xảy ra khi Tôi đào tạo LogisticRegressionWithSGD sau khi chuyển dữ liệu vào LabeledPoint.

Có ai có ý tưởng về điều này không?

Mã của tôi là như sau (tôi đang sử dụng một máy tính xách tay IPython cho việc này):

from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.classification import LogisticRegressionWithSGD 
from numpy import array 
from sklearn.feature_extraction import FeatureHasher 
from pyspark import SparkContext 
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30) 
sc = SparkContext(conf=sf) 
training_file = sc.textFile("train_small.txt") 
def hash_feature(line): 
    values = [0, dict()] 
    for index, x in enumerate(line.strip("\n").split('\t')): 
     if index == 0: 
      values[0] = float(x) 
     else: 
      values[1][str(index)+"_"+x] = 1 
    return values 
n_feature = 2**14 
hasher = FeatureHasher(n_features=n_feature) 
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])]) 
def build_lable_points(line): 
    values = [0.0] * n_feature 
    for index, value in zip(line[1].indices, line[1].data): 
     values[index] = value 
    return LabeledPoint(line[0], values) 
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line)) 
model = LogisticRegressionWithSGD.train(parsed_training_data) 

Lỗi xảy ra khi thực hiện dòng cuối cùng.

+1

Bạn có thể hiển thị mã của mình không? –

+0

mã được thêm vào bài đăng gốc, nhờ – peng

+0

Bạn có thể thử thêm phân vùng không? (Tôi nghĩ rằng nhiều phân vùng có nghĩa là ít dữ liệu hơn bởi phân vùng, do đó, nó nên làm các thủ thuật). –

Trả lời

1

Tại một số thời điểm, nó cố gắng lưu trữ các tính năng và 1,2M * 16384 lớn hơn Integer.MAX_INT vì vậy bạn đang cố gắng lưu trữ nhiều hơn kích thước tối đa các tính năng được Spark hỗ trợ.

Có thể bạn đang chạy vào các giới hạn của Apache Spark.

+1

Cảm ơn. bạn có thể xây dựng trên này? Tôi chưa bao giờ nghe nói về kích thước tối đa các tính năng được hỗ trợ bởi tia lửa. Tôi biết có giới hạn về kích thước khối cho tia lửa, xem https://issues.apache.org/jira/browse/SPARK-1476, tôi không chắc chắn liệu tôi có đánh trúng điều này hay không, nhưng nếu tôi nhấn vào điều này, tôi tự hỏi làm thế nào Tôi có thể tránh điều đó mà không làm giảm số lượng tính năng và số lượng hồ sơ – peng

11

Hạn chế Integer.MAX_INT là kích thước của tệp đang được lưu trữ. 1,2M hàng không phải là một điều lớn, để tôi không chắc chắn vấn đề của bạn là "các giới hạn của tia lửa". Nhiều khả năng, một phần công việc của bạn đang tạo ra thứ gì đó quá lớn để được xử lý bởi bất kỳ người thi hành nào.

Tôi không có bộ mã hóa Python, nhưng khi bạn "băm các tính năng của bản ghi", bạn có thể đang lấy một tập bản ghi rất thưa thớt cho một mẫu và tạo một mảng không thưa thớt. Điều này sẽ có nghĩa là rất nhiều bộ nhớ cho 16384 tính năng. Đặc biệt, khi bạn làm zip(line[1].indices, line[1].data). Lý do duy nhất mà không giúp bạn thoát khỏi bộ nhớ ngay lập tức có shitload của nó bạn dường như đã cấu hình (50G).

Một điều khác có thể giúp là tăng phân vùng. Vì vậy, nếu bạn không thể làm cho các hàng của bạn sử dụng ít bộ nhớ hơn, ít nhất bạn có thể thử có ít hàng hơn trên bất kỳ tác vụ cụ thể nào. Bất kỳ tệp tạm thời nào được tạo có thể phụ thuộc vào điều này, vì vậy, bạn sẽ khó có khả năng đạt đến giới hạn tệp.


Và, hoàn toàn không liên quan đến lỗi nhưng có liên quan cho những gì bạn đang cố gắng làm:

16384 thực sự là một số lượng lớn các tính năng, trong trường hợp lạc quan nơi mỗi người chỉ là một tính năng boolean, bạn có tổng cộng 2^16384 hoán vị có thể để học hỏi, đây là một số rất lớn (thử nó ở đây: https://defuse.ca/big-number-calculator.htm). Đó là VERY, rất có thể là không có thuật toán nào có thể tìm hiểu ranh giới quyết định chỉ với 1,2 triệu mẫu, có thể bạn sẽ cần ít nhất một vài nghìn tỷ nghìn ví dụ để tạo một vết lõm trên không gian tính năng như vậy. Học máy có những hạn chế của nó, vì vậy đừng ngạc nhiên nếu bạn không có được độ chính xác cao hơn ngẫu nhiên.

Tôi chắc chắn sẽ khuyên bạn nên thử một số loại giảm kích thước đầu tiên !!

+1

Cảm ơn. Vấn đề này chỉ là cố định bằng cách sử dụng phân vùng nhiều hơn khi tải dữ liệu. Chúng tôi chỉ là thử nghiệm trên tập dữ liệu nhỏ và đạt được một số ý tưởng, sau đó chúng tôi sẽ áp dụng cho tập dữ liệu lớn với nhiều máy mạnh mẽ. – peng

Các vấn đề liên quan