2013-07-22 20 views
10

Trong tài liệu spark chính thức, có một ví dụ cho một ắc được sử dụng trong một cuộc gọi foreach mà là trực tiếp trên một RDD:Accumulator không thành công trên cụm, làm việc tại địa phương

scala> val accum = sc.accumulator(0) 
accum: spark.Accumulator[Int] = 0 

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
... 
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 

scala> accum.value 
res2: Int = 10 

tôi thực hiện cho ác của riêng tôi:

val myCounter = sc.accumulator(0) 

val myRDD = sc.textFile(inputpath) // :spark.RDD[String] 

myRDD.flatMap(line => foo(line)) // line 69 

def foo(line: String) = { 
    myCounter += 1 // line 82 throwing NullPointerException 
    // compute something on the input 
} 
println(myCounter.value) 

Trong cài đặt cục bộ, tính năng này chỉ hoạt động tốt. Tuy nhiên, nếu tôi chạy công việc này trên một cụm tia lửa độc lập với một số máy móc, công nhân ném một

13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247 
java.lang.NullPointerException 
    at MyClass$.foo(MyClass.scala:82) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) 
    at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.scheduler.ResultTask.run(ResultTask.scala:77) 
    at spark.executor.Executor$TaskRunner.run(Executor.scala:98) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:722) 

tại dòng có nghĩa là tăng ắc myCounter.

Câu hỏi của tôi là: Chỉ có thể sử dụng bộ tích lũy trong các chức năng ẩn danh "cấp cao nhất" được áp dụng trực tiếp cho RDD và không được sử dụng trong các hàm lồng nhau? Nếu có, tại sao cuộc gọi của tôi thành công cục bộ và không thành công trên một cụm?

chỉnh sửa: tăng độ dài ngoại lệ.

+0

Bạn có thể đăng thêm lần truy nguyên không? –

+0

Bạn đã thử 'sc.broadcast (myCounter)' chưa? – Noah

+0

Không phát 'trả lại giá trị chỉ đọc? Từ [tài liệu API chính thức] (http://spark-project.org/docs/latest/api/core/index.html#spark.SparkContext): "Phát một biến chỉ đọc tới cụm, trả về một đối tượng Broadcast để đọc nó trong các hàm phân tán. Biến này sẽ được gửi tới từng cụm chỉ một lần. " – ptikobj

Trả lời

1

gì nếu bạn xác định các chức năng như thế này:

def foo(line: String, myc: org.apache.spark.Accumulator[Int]) = { 
    myc += 1 
} 

Và sau đó gọi nó như thế này:

foo(line, myCounter) 

?

+0

Điều này có vẻ đúng, Bạn có thể vượt qua bộ tích lũy mà bạn đã tạo cho phương thức – pulasthi

-1

Nếu bạn sử dụng "flatMap" thì "myCounter" sẽ không cập nhật vì "flatMap" là chức năng lazzy. Bạn có thể sử dụng mã này:

myRDD.foreach(line => foo(line)) 
def foo(line: String) = {myCounter +=1} 
println(myCounter.value) 
2

Trong trường hợp của tôi cũng vậy, ắc là null trong việc đóng cửa khi tôi sử dụng 'mở rộng ứng dụng' để tạo một ứng dụng tia lửa như hình dưới đây

object AccTest extends App { 


    val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 

    val accum = sc.accumulator(0, "My Accumulator") 
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

    println("count:" + accum.value) 

    sc.stop 
    } 
} 

tôi thay thế mở rộng ứng dụng với phương pháp main() và nó hoạt động trong cụm sỢI trong HDP 2.4 đối tượng AccTest { def chính (args: Array [Chuỗi]): Đơn vị = {

val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
val sc = new SparkContext(conf) 
sc.setLogLevel("ERROR") 

val accum = sc.accumulator(0, "My Accumulator") 
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

println("count:" + accum.value) 

sc.stop 

} }

Các vấn đề liên quan