2015-11-18 24 views
5

Tôi biết tia lửa đánh giá lười biếng.Apache Spark: hoạt động công đoàn không được thực hiện

Nhưng hành vi này có được mong đợi không ?? Với chương trình dưới đây sản lượng là 20.

Nhưng nếu báo cáo kết quả in

System.out.println("/////////////////// After "+MainRDD.count()); 

là không chú thích, sản lượng sẽ là 40

Tôi không làm điều này như là trong ứng dụng của tôi, Nhưng chỉ để minh họa, tôi đã tạo chương trình này ..

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaSparkSQL"); 
JavaSparkContext sc = new JavaSparkContext(sparkConf); 

JavaRDD<Integer> MainRDD; 
ArrayList<Integer> list = new ArrayList<>(); 
JavaRDD<Integer> tmp; 
for (int i = 0; i < 20; i++) { 
    list.add(i); 
} 

MainRDD = sc.parallelize(list);// MainRDD.union(tmp); 
System.out.println("//////////////////////First "+MainRDD.count()); 

list.clear(); 
for (int i = 20; i < 25; i++) { 
    for (int j = 1; j < 5; j++) { 
     list.add(i*j); 
    } 
    tmp = sc.parallelize(list); 

    //  System.out.println("/////////////////// Before "+MainRDD.count()); 
    MainRDD = MainRDD.union(tmp); 
//  System.out.println("/////////////////// After "+MainRDD.count()); 
    list.clear(); 
} 

System.out.println("/////////////////// last "+MainRDD.count()); 
} 
+0

@eliash Thật sao? Tôi thực sự ngạc nhiên nó hoạt động ở tất cả :) – zero323

+0

Vâng, thực sự ... Tôi có một hành vi rất lạ! Tôi nghĩ rằng đó là do sự biến đổi của cấu trúc dữ liệu ... – eliasah

Trả lời

2

Nguồn của vấn đề là cấu trúc dữ liệu có thể thay đổi mà bạn sử dụng để điền RDD. Khi bạn gọi sc.parallelize(list), nó sẽ không chụp trạng thái của ArrayList. Vì bạn gọi clear khi bạn xuất vòng lặp khi dữ liệu thực sự được đánh giá, không có dữ liệu nào cả.

Sự thật là tôi không biết tại sao hành vi này thay đổi khi bạn gọi phương thức count. Vì RDD không được lưu trữ nên tôi đoán là vấn đề của Spark hoặc JVM internals nhưng tôi thậm chí sẽ không đoán được điều gì đang thực sự xảy ra ở đó. Có thể ai đó thông minh hơn sẽ có thể biết lý do chính xác cho hành vi này.

Chỉ để minh họa những gì đang xảy ra:

val arr = Array(1, 2, 3) 

val rdd = sc.parallelize(arr) 

(0 until 3).foreach(arr(_) = 99) 
val tmp = sc.parallelize(arr) 

tmp.union(rdd).collect 
// Array[Int] = Array(99, 99, 99, 99, 99, 99) 

vs

val arr = Array(1, 2, 3) 

val rdd = sc.parallelize(arr) 
rdd.count() 
// Long = 3 

(0 until 3).foreach(arr(_) = 99) 
val tmp = sc.parallelize(arr) 

tmp.union(rdd).collect 
// Array[Int] = Array(99, 99, 99, 1, 2, 3) 

sc.getRDDStorageInfo 
// Array[org.apache.spark.storage.RDDInfo] = Array() 
+0

Nó sẽ không khôn ngoan để mở một vấn đề cho điều đó? – eliasah

+0

@eliasah, chính xác những gì tôi nghĩ .. Nhưng tôi còn quá trẻ để làm điều đó .. – Anil

+0

Từ bất cứ điều gì tôi biết, như tia lửa đánh giá lười biếng, trừ khi tôi gọi phương pháp đếm/một số hoạt động trên RDD, nó không được tính toán/sửa đổi .. Do đó tình hình này .. Đúng tôi nếu tôi sai – Anil

Các vấn đề liên quan