Trong tài liệu spark chính thức, có một ví dụ cho một ắc được sử dụng trong một cuộc gọi foreach
mà là trực tiếp trên một RDD:Accumulator không thành công trên cụm, làm việc tại địa phương
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
tôi thực hiện cho ác của riêng tôi:
val myCounter = sc.accumulator(0)
val myRDD = sc.textFile(inputpath) // :spark.RDD[String]
myRDD.flatMap(line => foo(line)) // line 69
def foo(line: String) = {
myCounter += 1 // line 82 throwing NullPointerException
// compute something on the input
}
println(myCounter.value)
Trong cài đặt cục bộ, tính năng này chỉ hoạt động tốt. Tuy nhiên, nếu tôi chạy công việc này trên một cụm tia lửa độc lập với một số máy móc, công nhân ném một
13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
at MyClass$.foo(MyClass.scala:82)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
tại dòng có nghĩa là tăng ắc myCounter
.
Câu hỏi của tôi là: Chỉ có thể sử dụng bộ tích lũy trong các chức năng ẩn danh "cấp cao nhất" được áp dụng trực tiếp cho RDD và không được sử dụng trong các hàm lồng nhau? Nếu có, tại sao cuộc gọi của tôi thành công cục bộ và không thành công trên một cụm?
chỉnh sửa: tăng độ dài ngoại lệ.
Bạn có thể đăng thêm lần truy nguyên không? –
Bạn đã thử 'sc.broadcast (myCounter)' chưa? – Noah
Không phát 'trả lại giá trị chỉ đọc? Từ [tài liệu API chính thức] (http://spark-project.org/docs/latest/api/core/index.html#spark.SparkContext): "Phát một biến chỉ đọc tới cụm, trả về một đối tượng Broadcast để đọc nó trong các hàm phân tán. Biến này sẽ được gửi tới từng cụm chỉ một lần. " – ptikobj