Tôi vừa tạo Spark trên máy tính Windows 7 (sử dụng sbt
) và đang đi qua quick start. Công việc Spark không thành công khi gọi số first()
.Công việc Spark không thành công khi gọi trước() trong PySpark
Tôi mới sử dụng Java và không có ý tưởng rõ ràng về những gì mà stacktrace lỗi đang hiển thị cho tôi mặc dù nó có vẻ liên quan đến java.net.SocketException
được cung cấp thông báo. Lưu ý Tôi không sử dụng cài đặt Hadoop. Cũng lưu ý rằng khi chạy ví dụ này trong Scala, không có lỗi.
Môi trường:
Windows 7
Spark 1.2.1
Anaconda Python 2.7.8
Scala 2.10.4
SBT 0.13.7
jdk 1.7.0.75
In [2]: path = u'C:\\Users\\striji\\Documents\\Personal\\python\\pyspark-flights\\2001.csv.bz2'
In [3]: textFile = sc.textFile(path)
In [4]: textFile
Out[4]: C:\Users\striji\Documents\Personal\python\pyspark-flights\2001.csv.bz2 MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
In [5]: textFile.count()
...
Out[5]: 5967781
In [6]: textFile.first()
15/02/19 08:52:01 INFO SparkContext: Starting job: runJob at PythonRDD.scala:344
15/02/19 08:52:01 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true)
15/02/19 08:52:01 INFO DAGScheduler: Final stage: Stage 1(runJob at PythonRDD.scala:344)
15/02/19 08:52:01 INFO DAGScheduler: Parents of final stage: List()
15/02/19 08:52:01 INFO DAGScheduler: Missing parents: List()
15/02/19 08:52:01 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(4560) called with curMem=46832, maxMem=278302556
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 265.4 MB)
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(3417) called with curMem=51392, maxMem=278302556
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.3 KB, free 265.4 MB)
15/02/19 08:52:01 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:51106 (size: 3.3 KB, free: 265.4 MB)
15/02/19 08:52:01 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/02/19 08:52:01 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/02/19 08:52:01 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43)
15/02/19 08:52:01 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/02/19 08:52:01 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1341 bytes)
15/02/19 08:52:01 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/02/19 08:52:04 INFO HadoopRDD: Input split: file:/C:/Users/striji/Documents/Personal/python/pyspark-flights/2001.csv.bz2:0+83478700
15/02/19 08:52:04 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/19 08:52:05 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/19 08:52:05 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/02/19 08:52:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/02/19 08:52:05 INFO TaskSchedulerImpl: Cancelling stage 1
15/02/19 08:52:05 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:344, took 3.796728 s
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-674a86098a8f> in <module>()
----> 1 textFile.first()
c:\spark-1.2.1\python\pyspark\rdd.pyc in first(self)
1137 ValueError: RDD is empty
1138 """
-> 1139 rs = self.take(1)
1140 if rs:
1141 return rs[0]
c:\spark-1.2.1\python\pyspark\rdd.pyc in take(self, num)
1119
1120 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1121 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1122
1123 items += res
c:\spark-1.2.1\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
825 # SparkContext#runJob.
826 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 827 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
828 return list(mappedRDD._collect_iterator_through_file(it))
829
c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0
in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1
214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Tôi vẫn gặp lỗi tương tự khi sử dụng mã của bạn. – Pete