2016-03-21 16 views
6

Tôi có một chương trình Java bằng cách sử dụng Apache Spark. Phần thú vị nhất của chương trình trông như thế này:Java Apache Spark: Chuỗi chuyển đổi dài dẫn đến thời gian bậc hai

long seed = System.nanoTime(); 

JavaRDD<AnnotatedDocument> annotated = documents 
    .mapPartitionsWithIndex(new InitialAnnotater(seed), true); 
annotated.cache(); 

for (int iter = 0; iter < 2000; iter++) { 
    GlobalCounts counts = annotated 
     .mapPartitions(new GlobalCounter()) 
     .reduce((a, b) -> a.sum(b)); // update overall counts (*) 

    seed = System.nanoTime(); 

    // copy overall counts which CountChanger uses to compute a stochastic thing (**) 
    annotated = annotated 
     .mapPartitionsWithIndex(new CountChanger(counts, seed), true); 
    annotated.cache(); 

    // adding these lines causes constant time complexity like i want 
    //List<AnnotatedDocument> ll = annotated.collect(); 
    //annotated = sc.parallelize(ll, 8); 
} 

Vì vậy, trong thực tế, các dòng (**) kết quả trong một RDD với hình thức

documents 
    .mapPartitionsWithIndex(initial) 
    .mapPartitionsWithIndex(nextIter) 
    .mapPartitionsWithIndex(nextIter) 
    .mapPartitionsWithIndex(nextIter) 
    ... 2000 more 

một chuỗi rất dài của bản đồ thực sự. Ngoài ra, dòng (*) buộc tính toán (không lười) tại mỗi lần lặp vì số lượng cần được cập nhật.

Tôi có vấn đề là tôi có được một độ phức tạp thời gian đó tăng tuyến tính với mỗi lần lặp, và vì vậy bậc hai tổng thể:

enter image description here

Tôi nghĩ rằng điều này là do Spark cố gắng để "nhớ" mỗi RDD trong chuỗi, và thuật toán khoan dung lỗi hoặc bất cứ điều gì gây ra điều này để phát triển. Tuy nhiên, tôi thực sự không có ý tưởng.

Điều tôi thực sự muốn làm là tại mỗi lần lặp lại cho Spark biết "thu gọn" RDD sao cho chỉ cái cuối cùng được giữ trong bộ nhớ và làm việc. Điều này sẽ dẫn đến thời gian lặp lại mỗi lần lặp lại, tôi nghĩ vậy. Điều này có thể không? Có bất kỳ giải pháp khác?

Cảm ơn!

+0

Có lý do nào để bạn lưu vào bộ đệm RDD cho mỗi lần lặp không? Thay vì bộ nhớ đệm RDD tích lũy cuối cùng ở cuối vòng lặp? –

+0

Tôi vẫn đang thử nghiệm với các hiệu ứng của bộ nhớ đệm để câu trả lời của tôi sẽ phải "không thực sự". – bombax

+0

Bạn có thực sự tái sử dụng RDD từng tính toán không? Hoặc là nó một RDD tươi mỗi khi bạn muốn tính toán các quầy? –

Trả lời

6

Thử sử dụng rdd.checkpoint. Điều này sẽ lưu RDD thành hdfs và dòng dõi rõ ràng.

Mỗi khi bạn chuyển đổi RDD, bạn phát triển dòng dõi và Spark phải theo dõi những gì có sẵn và những gì phải được tính lại. Xử lý DAG là DAG đắt tiền và lớn có xu hướng tiêu diệt hiệu suất khá nhanh. Bằng cách "kiểm tra" bạn hướng dẫn Spark tính toán và lưu RDD kết quả và loại bỏ thông tin về cách nó được tạo ra. Điều này làm cho nó tương tự như chỉ đơn giản là tiết kiệm một RDD và đọc nó trở lại mà giảm thiểu hoạt động DAG. Trên một sidenote, kể từ khi bạn nhấn vấn đề này, nó là tốt để biết rằng union cũng ảnh hưởng đến hiệu suất RDD bằng cách thêm steps và cũng có thể ném một StackOverflowError do cách thông tin dòng truyền thừa là. See this post

This link có nhiều chi tiết hơn với sơ đồ đẹp và chủ đề cũng được đề cập in this SO post.

+0

Bạn có cân nhắc cập nhật câu trả lời của mình không? –

+0

chắc chắn - quên tất cả về nó. (bài đăng SO bị chặn) –

3

Đó là một câu hỏi thực sự thú vị và có một vài điều cần cân nhắc.

Về cơ bản đây là thuật toán lặp lại, nếu bạn xem xét một số thuật toán học máy lặp lại khác nhau trong Spark, bạn có thể thấy một số phương pháp để làm việc với loại sự cố này.

Điều đầu tiên mà hầu hết trong số họ không lưu vào bộ nhớ cache trên mỗi lần lặp - thay vào đó chúng có khoảng thời gian lưu bộ nhớ đệm có thể định cấu hình. Tôi có thể bắt đầu bằng cách lưu vào bộ nhớ đệm sau mỗi lần lặp lại 10 lần và xem cách nó diễn ra như thế nào.

Sự cố khác sẽ trở thành biểu đồ đường truyền, mỗi mapPartitions bạn thực hiện đang tăng biểu đồ nhiều hơn một chút. Tại một số điểm theo dõi dữ liệu đó sẽ bắt đầu trở nên ngày càng đắt đỏ hơn. checkpoint cho phép bạn có Spark ghi RDD hiện tại để lưu trữ lâu dài và loại bỏ thông tin dòng. Bạn có thể thử làm điều này tại một số khoảng thời gian như mỗi 20 lần lặp lại và nhìn thấy cách này đi.

Số 10 và 20 chỉ là các điểm bắt đầu cơ bản, chúng phụ thuộc vào tốc độ tính toán dữ liệu cho từng lần lặp riêng lẻ và bạn có thể chơi với chúng để tìm điều chỉnh phù hợp cho công việc của mình.

1
  • cố gắng thực hiện rdd của bạn trước khi lưu vào bộ nhớ cache với chú thích.count() cứ vài lần (cần điều chỉnh) lặp lại.
  • tốt hơn là kiểm soát vị trí lưu trong bộ nhớ cache (...) thay vì bộ đệm() đặt bộ nhớ trong bộ nhớ, cho phép bạn chọn vị trí của nó (tùy thuộc vào khả năng bộ nhớ của bạn)
  • để "lưu" bộ đệm được lưu trong bộ nhớ cache/kéo dài và sau đó unpersist nó sau khi bộ nhớ đệm/kéo dài chu kỳ tiếp theo. Spark tự làm, nhưng nếu bạn kiểm soát nó, sẽ không cần phải chọn rdd nào để ném từ bộ nhớ cache
Các vấn đề liên quan