Tôi đang xây dựng một ứng dụng cần tải các tập dữ liệu từ S3. Chức năng hoạt động chính xác, nhưng hiệu suất lại chậm đáng ngạc nhiên.Việc đọc tệp CSV từ S3 vào một khung dữ liệu Spark dự kiến sẽ rất chậm?
Bộ dữ liệu ở định dạng CSV. Có khoảng 7 hồ sơ (dòng) trong mỗi tệp và mỗi tệp là 600-700MB.
val spark = SparkSession
.builder()
.appName("MyApp")
.getOrCreate()
val df = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inFileName:_*)
// inFileName is a list that current contains 2 file names
// eg. s3://mybucket/myfile1.csv
val r = df.rdd.flatMap{ row =>
/*
* Discard poorly formated input records
*/
try {
totalRecords.add(1)
// this extracts several columns from the dataset
// each tuple of indexColProc specifies the index of the column to
// select from the input row, and a function to convert
// the value to an Int
val coords = indexColProc.map{ case (idx, func) => func(row.get(idx).toString) }
List((coords(0), coords))
}
catch {
case e: Exception => {
badRecords.add(1)
List()
}
}
}
println("Done, row count " + r.count)
Tôi chạy nó trên một cụm AWS gồm 5 máy, mỗi máy có m3.xlớn. Tham số maximumResourceAllocation được đặt thành true và đây là ứng dụng duy nhất chạy trên cụm.
Tôi đã chạy ứng dụng hai lần. Lần đầu tiên với 'inFileName' trỏ vào các tệp trên S3 và lần thứ hai trỏ vào một bản sao cục bộ của các tệp trong hệ thống tệp hadoop.
Khi tôi nhìn vào máy chủ lịch sử Spark và đi sâu vào công việc tương ứng với hành động r.count cuối cùng, tôi thấy rằng phải mất 2,5 phút khi truy cập các tệp trên s3 và 18 khi truy cập tệp cục bộ hdfs. Tôi "đã nhận được kết quả tương ứng tương tự khi tôi chạy thử nghiệm tương tự trên một cụm nhỏ hơn hoặc bằng thạc sĩ = cấu hình địa phương.
Khi tôi sao chép các tập tin s3 vào cluster sử dụng
aws s3 cp <file>
Nó chỉ mất 6.5s để di chuyển một tập tin 600-700MB Do đó, có vẻ như I/O thô của cá thể máy đang góp phần làm chậm quá trình làm việc. có thể ai đó vui lòng chỉ ra nơi tôi đang đi sai.Nếu nó được mong đợi, là những cách khác để làm điều này mà sẽ có hiệu suất tốt hơn? Hay tôi cần phải phát triển một cái gì đó để chỉ cần sao chép các tập tin từ s3 đến hdfs trước khi ứng dụng chạy?
có thể thử 'df.cache()' trước khi bạn chạy 'flatMap'/ – maxymoo
Tôi đã thử điều đó, nhưng nó không có hiệu lực hoặc thực sự khiến đường dẫn bị treo (tôi đã ở trên một máy khác khi tôi thử thử nghiệm đó). –