2015-02-19 17 views
6

Tôi vừa tạo Spark trên máy tính Windows 7 (sử dụng sbt) và đang đi qua quick start. Công việc Spark không thành công khi gọi số first().Công việc Spark không thành công khi gọi trước() trong PySpark

Tôi mới sử dụng Java và không có ý tưởng rõ ràng về những gì mà stacktrace lỗi đang hiển thị cho tôi mặc dù nó có vẻ liên quan đến java.net.SocketException được cung cấp thông báo. Lưu ý Tôi không sử dụng cài đặt Hadoop. Cũng lưu ý rằng khi chạy ví dụ này trong Scala, không có lỗi.

Môi trường:

Windows 7
Spark 1.2.1
Anaconda Python 2.7.8
Scala 2.10.4
SBT 0.13.7
jdk 1.7.0.75

In [2]: path = u'C:\\Users\\striji\\Documents\\Personal\\python\\pyspark-flights\\2001.csv.bz2' 

In [3]: textFile = sc.textFile(path) 

In [4]: textFile 
Out[4]: C:\Users\striji\Documents\Personal\python\pyspark-flights\2001.csv.bz2 MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 

In [5]: textFile.count() 
     ... 
Out[5]: 5967781 

In [6]: textFile.first() 
15/02/19 08:52:01 INFO SparkContext: Starting job: runJob at PythonRDD.scala:344 
15/02/19 08:52:01 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true) 
15/02/19 08:52:01 INFO DAGScheduler: Final stage: Stage 1(runJob at PythonRDD.scala:344) 
15/02/19 08:52:01 INFO DAGScheduler: Parents of final stage: List() 
15/02/19 08:52:01 INFO DAGScheduler: Missing parents: List() 
15/02/19 08:52:01 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents 
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(4560) called with curMem=46832, maxMem=278302556 
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 265.4 MB) 
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(3417) called with curMem=51392, maxMem=278302556 
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.3 KB, free 265.4 MB) 
15/02/19 08:52:01 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:51106 (size: 3.3 KB, free: 265.4 MB) 
15/02/19 08:52:01 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 
15/02/19 08:52:01 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838 
15/02/19 08:52:01 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43) 
15/02/19 08:52:01 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
15/02/19 08:52:01 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1341 bytes) 
15/02/19 08:52:01 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
15/02/19 08:52:04 INFO HadoopRDD: Input split: file:/C:/Users/striji/Documents/Personal/python/pyspark-flights/2001.csv.bz2:0+83478700 
15/02/19 08:52:04 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
15/02/19 08:52:05 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

15/02/19 08:52:05 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
15/02/19 08:52:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/02/19 08:52:05 INFO TaskSchedulerImpl: Cancelling stage 1 
15/02/19 08:52:05 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:344, took 3.796728 s 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-6-674a86098a8f> in <module>() 
----> 1 textFile.first() 

c:\spark-1.2.1\python\pyspark\rdd.pyc in first(self) 
    1137   ValueError: RDD is empty 
    1138   """ 
-> 1139   rs = self.take(1) 
    1140   if rs: 
    1141    return rs[0] 

c:\spark-1.2.1\python\pyspark\rdd.pyc in take(self, num) 
    1119 
    1120    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1121    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
    1122 
    1123    items += res 

c:\spark-1.2.1\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    825   # SparkContext#runJob. 
    826   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 827   it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 
    828   return list(mappedRDD._collect_iterator_through_file(it)) 
    829 

c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 
in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset 
     at java.net.SocketInputStream.read(SocketInputStream.java:196) 
     at java.net.SocketInputStream.read(SocketInputStream.java:122) 
     at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) 
     at java.io.BufferedInputStream.read(BufferedInputStream.java:254) 
     at java.io.DataInputStream.readInt(DataInputStream.java:387) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) 
     at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:56) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1 
214) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Trả lời

0

Theo Lỗi - đó là do RDD của bạn là trống.

Bạn đang gọi trước tiên() về nội dung không tồn tại.

thử này - ví dụ pyspark

'

People=["1,Maj,123","2,Pvt,333","3,Col,999"] 
rrd1=sc.parallelize(People) 
rrd1.first() 

'

nó nên sản lượng

'

'1,Maj,123' 

'

+0

Tôi vẫn gặp lỗi tương tự khi sử dụng mã của bạn. – Pete

Các vấn đề liên quan