2015-04-27 17 views
30

Tôi đang cố gắng hiểu cách bộ nhớ cache của Spark hoạt động.Tìm hiểu về bộ nhớ đệm của Spark

Đây là sự hiểu biết ngây thơ của tôi, xin vui lòng cho tôi biết nếu tôi là thiếu cái gì:

val rdd1 = sc.textFile("some data") 
rdd1.cache() //marks rdd1 as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

Ở phía trên, rdd1 sẽ được nạp từ đĩa (ví dụ HDFS) chỉ một lần. (khi rdd2 được lưu tôi giả định) và sau đó từ bộ nhớ cache (giả sử có đủ RAM) khi rdd3 được lưu)

Bây giờ đây là câu hỏi của tôi. Giả sử tôi muốn cache rdd2 và rdd3 vì chúng sẽ được sử dụng sau này, nhưng tôi không cần rdd1 sau khi tạo chúng.

Về cơ bản có sự trùng lặp, phải không? Kể từ khi rdd2 và rdd3 được tính toán, tôi không cần rdd1 nữa, tôi có lẽ nên unpersist nó, phải không? câu hỏi là khi nào?

Điều này có hiệu quả không? (Tùy chọn A)

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 

Tia lửa có thêm lời gọi không đáng tin cậy vào DAG không? hoặc là nó được thực hiện ngay lập tức? nếu nó được thực hiện ngay lập tức, sau đó về cơ bản rdd1 sẽ không được lưu trữ khi tôi đọc từ rdd2 và rdd3, phải không?

Tôi có nên thực hiện theo cách này thay thế (Tùy chọn B) không?

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 

rdd2.cache() 
rdd3.cache() 

rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

rdd1.unpersist() 

Vậy câu hỏi là thế này: là lựa chọn A đủ tốt? tức là rdd1 vẫn chỉ tải tệp một lần? Hoặc tôi có cần phải đi với Tùy chọn B không?

Trả lời

19

Dường như tùy chọn B là bắt buộc. Lý do có liên quan đến cách persist/cache và unpersist được thực hiện bởi Spark. Vì các phép biến đổi RDD chỉ xây dựng các mô tả DAG mà không thực hiện, trong Tùy chọn A vào thời điểm bạn gọi là unpersist, bạn vẫn chỉ có các mô tả công việc chứ không phải là thực thi đang chạy.

Điều này có liên quan bởi vì cuộc gọi cache hoặc persist chỉ thêm RDD vào Bản đồ RDD tự đánh dấu là vẫn tồn tại trong quá trình thực hiện công việc. Tuy nhiên, unpersist trực tiếp cho blockManager gỡ bỏ RDD khỏi bộ nhớ và loại bỏ tham chiếu trong Bản đồ các RDD liên tục.

persist function

unpersist function

Vì vậy, bạn sẽ cần phải gọi unpersist sau Spark thực sự thực hiện và lưu trữ các RDD với người quản lý khối.

Các ý kiến ​​cho gợi ý RDD.persist phương pháp hướng này: rdd.persist

+1

Yep, dường như bạn đang ở trên nó. Đây là một chút không may, tôi muốn "cache" có thể đã được chuyển đổi thành một hoạt động DAG và không chỉ thêm RDD ID vào bản đồ ... có rất nhiều trường hợp bạn muốn cache một cái gì đó trung gian, tạo RDD mới, sau đó thả cái cũ. Có lẽ có lý do lý thuyết tốt về lý do tại sao đây không phải là một ý tưởng tốt mặc dù ... trong mọi trường hợp, LRU (tôi giả định) thứ tự của bộ nhớ đệm có nghĩa là cũ chưa sử dụng rdd1 sẽ bị trục xuất nếu rdd2 và rdd3 cần không gian đó cho bộ nhớ đệm ... –

+0

Vì vậy, tôi hầu như chỉ xem xét những gì tồn tại/cache và unpersist đang làm, nhưng vẫn còn chỗ để xem xét Spark đang làm gì khi bạn lấy RDD từ một cái khác và cách nó có thể tối ưu hóa. Tôi không chắc chắn rằng 'rdd1' thậm chí cần phải được lưu trữ, nó có thể được kiểm tra bởi 'rdd2' và' rdd3' khi chúng được lưu trữ hoặc khi DAG được pipelined. Đây là một khu vực màu xám cho tôi mặc dù. – Rich

+2

Đã tiến hành điều tra thêm một chút và truy tìm thông qua trình gỡ lỗi. 'rdd2' và' rdd3' sẽ tham chiếu 'rdd1' làm phụ thuộc. 'rdd1' sẽ tải dữ liệu của nó vào các phân vùng một lần trên hành động đầu tiên được thực hiện. Bây giờ, tại thời điểm này, 'rdd2' và' rdd3' đều áp dụng các biến đổi của chúng cho dữ liệu đã được nạp bởi 'rdd1' trong phân vùng. Tôi tin rằng bộ nhớ đệm cung cấp giá trị nếu bạn chạy nhiều hành động trên cùng một RDD chính xác, nhưng trong trường hợp này các RDD phân nhánh mới, tôi không nghĩ rằng bạn gặp phải vấn đề tương tự vì tôi tin rằng Spark biết rằng 'rdd1' vẫn là phụ thuộc cho 'rdd3' sau lần lưu đầu tiên. – Rich

2

Trong tùy chọn A, bạn đã không được hiển thị khi bạn đang gọi hành động (gọi điện để tiết kiệm)

val rdd1 = sc.textFile("some data") 
rdd.cache() //marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

Nếu dãy là như trên, Tùy chọn A nên sử dụng phiên bản được lưu trong bộ nhớ cache của rdd1 để tính cả rdd2 và rdd 3

+0

Tôi nên đồng ý, nhưng phải không? Tôi nghĩ rằng nó sẽ không, như khi bạn gọi rdd2.saveAsTestFile vv, rdd1 đã được đánh dấu là không tồn tại. sự tồn tại/unpersist không phải là trên DAG –

+0

cho đến khi bạn gọi saveAsFile, không có gì ** thực sự ** xảy ra ..... vì vậy quan điểm của tôi là thứ tự của cuộc gọi rdd1.unpersist không quan trọng nếu rdd2 đã được lưu trữ –

0

Tùy chọn B là phương pháp tối ưu với tinh chỉnh nhỏ. Sử dụng các phương pháp hành động ít tốn kém hơn. Trong cách tiếp cận được đề cập bởi mã của bạn, saveAsTextFile là một hoạt động tốn kém, thay thế nó bằng phương pháp đếm.

Idea ở đây là để loại bỏ các rdd1 lớn từ DAG, nếu nó không có liên quan để tính toán tiếp theo (sau rdd2 và rdd3 được tạo ra)

phương pháp cập nhật từ mã

val rdd1 = sc.textFile("some data").cache() 
val rdd2 = rdd1.filter(...).cache() 
val rdd3 = rdd1.map(...).cache() 

rdd2.count 
rdd3.count 

rdd1.unpersist() 
Các vấn đề liên quan