2014-07-08 15 views
11

Cần trợ giúp để thực hiện phương pháp hay nhất. Môi trường hoạt động như sau:Apache Spark on YARN: Số lượng lớn các tệp dữ liệu đầu vào (kết hợp nhiều tệp đầu vào trong tia lửa)

  • Tệp dữ liệu nhật ký đến không đều.
  • Kích thước của tệp dữ liệu nhật ký là từ 3.9KB đến 8.5MB. Trung bình khoảng 1MB.
  • Số lượng hồ sơ của tệp dữ liệu là từ 13 dòng đến 22000 dòng. Trung bình là khoảng 2700 dòng.
  • Tệp dữ liệu phải được xử lý trước khi tổng hợp.
  • Thuật toán xử lý sau có thể thay đổi.
  • Tệp được xử lý sau được quản lý riêng với tệp dữ liệu gốc vì thuật toán sau xử lý có thể bị thay đổi.
  • Kết hợp hàng ngày được thực hiện. Tất cả các tệp dữ liệu sau xử lý phải được lọc theo từng bản ghi và tổng hợp (trung bình, tối đa ...) được tính toán.
  • Vì tổng hợp là chi tiết, số lượng bản ghi sau khi tổng hợp không quá nhỏ. Nó có thể bằng một nửa số lượng bản ghi gốc.
  • Tại một thời điểm, số lượng tệp được xử lý sau có thể là khoảng 200.000.
  • Tệp dữ liệu sẽ có thể bị xóa riêng lẻ.

Trong thử nghiệm, tôi đã cố gắng xử lý 160.000 tệp được xử lý sau bởi Spark bắt đầu bằng sc.textFile() với đường dẫn glob, không thành công với ngoại lệ OutOfMemory trên quy trình trình điều khiển.

Phương pháp hay nhất để xử lý loại dữ liệu này là gì? Tôi có nên sử dụng HBase thay vì các tệp đơn giản để lưu dữ liệu đã xử lý không?

Trả lời

8

Chúng tôi đã viết bộ tải riêng. Nó giải quyết vấn đề của chúng tôi với các tệp nhỏ trong HDFS. Nó sử dụng Hadoop CombineFileInputFormat. Trong trường hợp của chúng tôi, nó giảm số lượng người lập bản đồ từ 100000 xuống còn khoảng 3000 và thực hiện công việc nhanh hơn đáng kể.

https://github.com/RetailRocket/SparkMultiTool

Ví dụ:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count()) 
+0

Cảm ơn bạn đã chia sẻ điều này. Tôi nghĩ rằng đối số kích thước đặc biệt có giá trị, vì nó không thể được xác định trên coalesce(). – zeodtr

+0

Giải pháp này là tốt hơn so với coalesce bởi vì nó hoạt động ở giai đoạn bản đồ, nhưng sau đó kết hợp lại. –

+1

Kể từ bây giờ, Hadoop hỗ trợ CombineTextInputFormat (ít nhất là từ 2.2), kết hợp các tệp đầu vào nhỏ có thể được thực hiện với sc.newAPIHadoopFile(), mà không cần triển khai một lớp tùy chỉnh. – zeodtr

3

Tôi khá chắc chắn lý do bạn nhận OOM là do xử lý quá nhiều tệp nhỏ. Những gì bạn muốn là kết hợp các tập tin đầu vào để bạn không nhận được quá nhiều phân vùng. Tôi cố gắng giới hạn công việc của mình thành khoảng 10k phân vùng.

Sau textFile, bạn có thể sử dụng .coalesce(10000, false) ... không chắc chắn 100% sẽ hoạt động mặc dù đã lâu rồi tôi mới làm điều đó, vui lòng cho tôi biết. Vì vậy, hãy thử

sc.textFile(path).coalesce(10000, false) 
+0

Cảm ơn! Tôi sẽ thử xem. – zeodtr

+0

Đã hoạt động! Trên thực tế tôi đã sử dụng yếu tố kết hợp 1227, đó là số phân vùng khi Spark xử lý tệp đơn lớn chứa toàn bộ bản ghi. Nhưng công việc chạy chậm hơn (như mong đợi), và vẫn có vẻ như thông tin của tất cả các tập tin vẫn được chuyển giao cho quá trình điều khiển, có thể gây ra OOM khi quá nhiều tập tin có liên quan. Nhưng 1,68 GB cho quá trình lái xe cho 168016 tập tin không phải là quá xấu. – zeodtr

+0

Vâng, chúng tôi có một công việc đơn giản riêng biệt đặc biệt để giảm số lượng tệp xuống vì đó là một điều quan trọng như vậy. Một khi tôi đã phải chạy nó trong 5 đi trên 5 tập con – samthebest

0

Bạn có thể sử dụng này

tiên bạn có thể nhận được một Buffer/Danh sách S3 Paths/Tương tự cho HDFS hoặc đường dẫn Local

Nếu bạn đang thử lại với Amazon S3 rồi:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

Bây giờ Vượt qua Danh sách đối tượng này đến đoạn mã sau đây, lưu ý: sc là một đối tượng của SQLContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Bây giờ bạn có một thức Unified RDD tức df

Không bắt buộc, Và Bạn cũng có thể phân vùng lại nó trong một BigRDD đơn

val files = sc.textFile(filename, 1).repartition(1) 

repartitioning luôn luôn làm việc: D

Các vấn đề liên quan