2015-06-23 23 views
6

Tôi khám phá Spark cho xử lý hàng loạt. Tôi đang chạy các tia lửa trên máy địa phương của tôi bằng cách sử dụng chế độ độc lập.Viết RDD như textfile sử dụng Apache Spark

Tôi đang cố gắng chuyển đổi RDD Spark thành tệp đơn [đầu ra cuối cùng] bằng phương thức saveTextFile(), nhưng nó không hoạt động.

Ví dụ nếu tôi có nhiều hơn một phân vùng như thế nào chúng ta có thể có được một tập tin duy nhất là đầu ra cuối cùng.

Cập nhật:

Tôi đã thử các cách tiếp cận dưới đây, nhưng tôi đang nhận được vô trỏ ngoại lệ.

person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output"); 
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output"); 

Trường hợp ngoại lệ là:

15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1 
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s 
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404) 
    at org.apache.hadoop.util.Shell.run(Shell.java:379) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639) 
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456) 
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) 
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) 
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook 
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040 
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler 
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion. 
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared 
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped 
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext 
15/06/23 18:25:28 INFO Utils: Shutdown hook called 

Kính trọng, Shankar

+0

cũng RDD bạn là nhận được sản phẩm nào ở đâu đó. chúng tôi không thể giúp bạn tìm thấy lỗi với phần mã bạn đã cung cấp cho chúng tôi .. tôi khuyên bạn nên thử ít nhất để đếm kiểm tra rdd của bạn nếu nó trống và làm từng cái một! – eliasah

+0

Bạn có thể kiểm tra các quyền FileSystem hoặc HDFS cho thư mục cụ thể đó không. Ngoài ra, bạn có thể nối thêm giao thức trước Đường dẫn Hệ thống Tập tin. Cũng như đã đề cập trước đó, bạn có thể cần phải thiết lập WinUtils trong đường dẫn hệ thống của bạn Nếu bạn muốn chạy những thứ liên quan đến Hadoop trên Local của bạn. –

Trả lời

5

Bạn có thể sử dụng phương pháp coalesce để lưu vào một tệp. Bằng cách này, mã của bạn sẽ trông như thế này:

val myFile = sc.textFile("file.txt") 
val finalRdd = doStuff(myFile) 
finalRdd.coalesce(1).saveAsTextFile("newfile") 

Ngoài ra còn có một phương pháp repartition để làm điều tương tự, tuy nhiên nó sẽ gây ra một xáo trộn đó là có thể rất tốn kém, trong khi liên hiệp sẽ cố gắng tránh một shuffle.

+0

tôi đang sử dụng Java để thực hiện Spark, nhưng tôi nhận được ngoại lệ, tôi đã cập nhật các câu hỏi với các chi tiết ngoại lệ. – Shankar

+2

Có vẻ như nó đang cố gắng ghi tệp và không thành công. Bạn có thể kiểm tra xem bạn có quyền ghi vào thư mục không? Ngoài ra, vì Spark là lười biếng, nó có thể là vấn đề là trong người rdd. Bạn có thể chạy 'person.coalesce (1) .toJavaRDD(). Count()' để đảm bảo rằng nó tạo ra một số dòng và không ném ngoại lệ? – Maksud

+0

khi tôi sử dụng saveAsTextFile (""), nơi nó sẽ lưu các tập tin, tôi có nghĩa là nút (công nhân hoặc lái xe). Ngoài ra chúng tôi có thể cung cấp cho bất kỳ tên tập tin cụ thể như tập tin đầu ra? – Shankar

-1

Bạn có thể sử dụng phương pháp phân vùng lại trong RDD. Nó thực sự tạo ra nhiều phân vùng khi bạn truyền số nguyên cho nó. Trong trường hợp của bạn, nó sẽ là:

rdd.repartition(1).saveAsTextFile("path to save rdd") 
+0

Tôi đang sử dụng Java để triển khai Spark, nhưng tôi nhận được ngoại lệ, tôi đã cập nhật câu hỏi với các chi tiết ngoại lệ. – Shankar

-1
  1. Tải winutils.exe
  2. Nơi winutils.exe dưới thư mục bin của bất kỳ ổ đĩa nào (D:/Winutils/bin /)
  3. Đặt đường dẫn trong mã của bạn như bên dưới

    System.setProperty ("hadoop.home.dir", "D: \\ Winutils \\");

Bây giờ hãy chạy mã của bạn, mã phải hoạt động.

Các vấn đề liên quan