2015-02-05 14 views
6

Tôi hiện đang cố gắng đào tạo một bộ các Vector từ vựng trên UMBC Webbase Corpus (khoảng 30GB văn bản trong 400 tệp).Cách hiệu quả của bộ nhớ để kết hợp một chuỗi các RDD từ các tệp trong Apache Spark

Tôi thường gặp phải tình trạng thiếu bộ nhớ ngay cả trên 100 GB cộng với Máy. Tôi chạy Spark trong ứng dụng. Tôi đã cố gắng tinh chỉnh một chút, nhưng tôi không thể thực hiện thao tác này trên hơn 10 GB dữ liệu văn bản. Sự tắc nghẽn rõ ràng của việc thực hiện của tôi là sự kết hợp của các RDD được tính toán trước đó, mà ở đó không có ngoại lệ bộ nhớ.

Có lẽ một trong những bạn có kinh nghiệm để đưa ra một bộ nhớ thực hiện hiệu quả hơn này:

object SparkJobs { 
    val conf = new SparkConf() 
    .setAppName("TestApp") 
    .setMaster("local[*]") 
    .set("spark.executor.memory", "100g") 
    .set("spark.rdd.compress", "true") 

    val sc = new SparkContext(conf) 


    def trainBasedOnWebBaseFiles(path: String): Unit = { 
    val folder: File = new File(path) 

    val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par 


    var i = 0; 
    val props = new Properties(); 
    props.setProperty("annotators", "tokenize, ssplit"); 
    props.setProperty("nthreads","2") 
    val pipeline = new StanfordCoreNLP(props); 

    //preprocess files parallel 
    val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => { 
     //preprocess line of file 
     println(file.getName() +"-" + file.getTotalSpace()) 
     val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield { 
      //performs some preprocessing like tokenization, stop word filtering etc. 
      processWebBaseLine(pipeline, line)  
     } 
     val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList 
     println(s"File $i done") 
     i = i + 1 
     sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER) 

    }) 

    val rdd_file = sc.union(training_data_raw.seq) 

    val starttime = System.currentTimeMillis() 
    println("Start Training") 
    val word2vec = new Word2Vec() 

    word2vec.setVectorSize(100) 
    val model: Word2VecModel = word2vec.fit(rdd_file) 

    println("Training time: " + (System.currentTimeMillis() - starttime)) 
    ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH) 
    }} 
} 
+1

Dữ liệu 30 GB trong tệp ... chắc chắn sẽ dẫn đến hơn 100 GB đối tượng Java ... Làm như vậy chỉ có một tệp trong bộ nhớ cùng một lúc ... xử lý ... sau đó tải tiếp một. –

+1

Ngoài ra ... không làm điều này -> 'StorageLevel.MEMORY_ONLY_SER' –

+0

Tôi cần xử lý chúng cùng một lúc vì trong bước phù hợp với mô hình, tất cả dữ liệu cần phải có mặt – dice89

Trả lời

1

Giống như Sarvesh chỉ ra trong các ý kiến, nó có lẽ là quá nhiều dữ liệu cho một máy duy nhất. Sử dụng nhiều máy hơn. Chúng tôi thường thấy cần30 GB bộ nhớ để làm việc với tệp có dung lượng 1 GB. Bằng cách này (cực kỳ thô) ước tính bạn cần 600 – 800 GB bộ nhớ cho đầu vào 30 GB. (Bạn có thể nhận được ước tính chính xác hơn bằng cách tải một phần dữ liệu.)

Nhận xét chung hơn, tôi khuyên bạn nên tránh sử dụng rdd.unionsc.parallelize. Sử dụng thay vì sc.textFile bằng ký tự đại diện để tải tất cả các tệp vào một RDD duy nhất.

0

Bạn đã thử nhận vectơ word2vec từ một kho văn bản nhỏ hơn chưa? Tôi nói với bạn điều này vì tôi đã chạy việc thực hiện spark word2vec trên một nhỏ hơn nhiều và tôi đã nhận được các vấn đề với nó gây ra vấn đề này: http://mail-archives.apache.org/mod_mbox/spark-issues/201412.mbox/%[email protected]%3E

Vì vậy, đối với trường hợp sử dụng của tôi, vấn đề này thực hiện một chút vô ích. Vì vậy, tôi đã sử dụng tia lửa để xoa bóp corpus của tôi nhưng không phải để thực sự nhận được vectơ.

  • Như đề xuất khác tránh xa việc gọi rdd.union.
  • Ngoài ra tôi nghĩ rằng .toList có thể sẽ thu thập mọi dòng từ RDD và thu thập nó trong Máy điều khiển của bạn (cái được sử dụng để gửi tác vụ) có lẽ đây là lý do tại sao bạn đang nhận ra bộ nhớ. Bạn hoàn toàn nên tránh biến RDD thành một danh sách!
Các vấn đề liên quan