2016-09-29 16 views
6

Tôi đang xây dựng một ứng dụng cần tải các tập dữ liệu từ S3. Chức năng hoạt động chính xác, nhưng hiệu suất lại chậm đáng ngạc nhiên.Việc đọc tệp CSV từ S3 vào một khung dữ liệu Spark dự kiến ​​sẽ rất chậm?

Bộ dữ liệu ở định dạng CSV. Có khoảng 7 hồ sơ (dòng) trong mỗi tệp và mỗi tệp là 600-700MB.

val spark = SparkSession 
     .builder() 
     .appName("MyApp") 
     .getOrCreate() 

val df = spark 
    .read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv(inFileName:_*) 
    // inFileName is a list that current contains 2 file names 
    // eg. s3://mybucket/myfile1.csv 

val r = df.rdd.flatMap{ row => 
     /* 
     * Discard poorly formated input records 
     */ 
     try { 
     totalRecords.add(1) 

     // this extracts several columns from the dataset 
     // each tuple of indexColProc specifies the index of the column to 
     // select from the input row, and a function to convert 
     // the value to an Int 
     val coords = indexColProc.map{ case (idx, func) => func(row.get(idx).toString) } 

     List((coords(0), coords)) 
     } 
     catch { 
     case e: Exception => {  
      badRecords.add(1) 
      List() 
     } 
     } 
    } 

println("Done, row count " + r.count) 

Tôi chạy nó trên một cụm AWS gồm 5 máy, mỗi máy có m3.xlớn. Tham số maximumResourceAllocation được đặt thành true và đây là ứng dụng duy nhất chạy trên cụm.

Tôi đã chạy ứng dụng hai lần. Lần đầu tiên với 'inFileName' trỏ vào các tệp trên S3 và lần thứ hai trỏ vào một bản sao cục bộ của các tệp trong hệ thống tệp hadoop.

Khi tôi nhìn vào máy chủ lịch sử Spark và đi sâu vào công việc tương ứng với hành động r.count cuối cùng, tôi thấy rằng phải mất 2,5 phút khi truy cập các tệp trên s3 và 18 khi truy cập tệp cục bộ hdfs. Tôi "đã nhận được kết quả tương ứng tương tự khi tôi chạy thử nghiệm tương tự trên một cụm nhỏ hơn hoặc bằng thạc sĩ = cấu hình địa phương.

Khi tôi sao chép các tập tin s3 vào cluster sử dụng

aws s3 cp <file> 

Nó chỉ mất 6.5s để di chuyển một tập tin 600-700MB Do đó, có vẻ như I/O thô của cá thể máy đang góp phần làm chậm quá trình làm việc. có thể ai đó vui lòng chỉ ra nơi tôi đang đi sai.Nếu nó được mong đợi, là những cách khác để làm điều này mà sẽ có hiệu suất tốt hơn? Hay tôi cần phải phát triển một cái gì đó để chỉ cần sao chép các tập tin từ s3 đến hdfs trước khi ứng dụng chạy?

+0

có thể thử 'df.cache()' trước khi bạn chạy 'flatMap'/ – maxymoo

+0

Tôi đã thử điều đó, nhưng nó không có hiệu lực hoặc thực sự khiến đường dẫn bị treo (tôi đã ở trên một máy khác khi tôi thử thử nghiệm đó). –

Trả lời

6

Sau khi tìm hiểu thêm, tôi phát hiện ra rằng việc sử dụng S3 gốc tạo nên sự khác biệt lớn. Tôi chỉ thay đổi tiền tố URI thành s3n: // và hiệu suất cho công việc được đề cập đã giảm từ 2,5 phút xuống còn 21 giây. Vì vậy, chỉ có một hình phạt 3s để truy cập s3 vs hdfs, điều này khá hợp lý.

Khi tìm kiếm chủ đề này, có nhiều bài đăng đề cập đến s3n có giới hạn kích thước tệp tối đa là 5 GB. Tuy nhiên, tôi đã xem qua số this cho biết giới hạn kích thước tệp tối đa đã được tăng lên 5TB trong Hadoop 2.4.0.

"Không nên sử dụng hệ thống tệp khối S3 nữa."

0

Bạn đã thử gói spark-csv chưa? Có rất nhiều tối ưu hóa để đọc csv và bạn có thể sử dụng chế độ = MALFORMED để thả các dòng xấu mà bạn đang cố gắng lọc. Bạn có thể đọc từ S3 trực tiếp như thế này:

csv_rdf<- read.df(sqlContext,"s3n://xxxxx:[email protected]/file1.csv",source="com.databricks.spark.csv") 

Thông tin chi tiết có thể được tìm thấy ở đây https://github.com/databricks/spark-csv

+0

Tôi đã thử sử dụng spark-csv trong quá khứ, nhưng không có sự khác biệt về hiệu suất. Nhưng cảm ơn con trỏ ở chế độ = MALFORMED. Tôi không biết về chế độ đó và có thể giúp đơn giản hóa mã của tôi. cảm ơn –

0

Chúng tôi phải đối mặt với vấn đề chính xác như vậy về một vài tháng trước đây, ngoại trừ việc dữ liệu của chúng tôi là 1TB vì vậy vấn đề là rõ ràng hơn.

Chúng tôi đào sâu vào nó và cuối cùng đã đi đến kết luận sau: Vì chúng tôi có 5 trường hợp với 30 trình thực thi mỗi lần mỗi giai đoạn được lên lịch (và điều đầu tiên tác vụ sẽ lấy dữ liệu từ S3) những nhiệm vụ này sẽ được đóng chai trên dải băng tần mạng, sau đó tất cả đều di chuyển để tính toán một phần nhiệm vụ và có thể tranh chấp cho CPU đồng thời.

Vì vậy, về cơ bản bởi vì các tác vụ đều đang làm cùng một điều cùng một lúc, chúng luôn cạnh tranh cho cùng một tài nguyên.

Chúng tôi đã chỉ ra rằng chỉ cho phép k số lượng tác vụ tại bất kỳ thời điểm nào sẽ cho phép họ hoàn tất tải xuống nhanh chóng và chuyển sang phần tính toán và sau đó là các tác vụ k sau đó bắt đầu tải xuống. Bằng cách này, bây giờ các nhiệm vụ k (trái ngược với tất cả) đang nhận được băng thông đầy đủ và một số tác vụ đồng thời làm một cái gì đó hữu ích trên CPU hoặc I/O mà không cần chờ đợi nhau trên một số tài nguyên chung.

Hy vọng điều này sẽ hữu ích.

+0

Âm thanh đáng để thử. Bạn có thể cung cấp một số gợi ý về cách triển khai loại điều khiển chi tiết đó của việc thực hiện tác vụ không? Trong trường hợp của tôi chỉ có 8 người thực thi, mỗi người có khoảng 4 nhân. Hiện tại, tôi đang chạy thử nghiệm trên m3.xlarge, nhưng nếu tôi có thể vượt qua nút cổ chai I/O này, bạn có thể chuyển sang một cá thể có nhiều lõi vật lý hơn. –

+0

Tôi không có mã công khai, nhưng những gì chúng tôi đã làm là - trong mỗi nhiệm vụ trước khi vào cuộc gọi chuyên sâu mạng, chúng tôi sẽ gọi một dịch vụ trung tâm, người sẽ theo dõi bao nhiêu công việc khác trong cùng một khối và nếu nó ít hơn một số k nó sẽ cho phép người khác yêu cầu nhiệm vụ này đi cho một spin bận rộn trong một giây và thử lại. Và sau đó khi một nhiệm vụ được thực hiện với cuộc gọi mạng, nó sẽ cập nhật các dịch vụ trung tâm như vậy. Phải thừa nhận rằng đây không phải là điều thanh lịch nhất, nhưng chúng tôi có thể giảm thời gian làm việc từ 90 phút xuống dưới 50 phút. –

+0

Sau một số lần đào bới, tôi phát hiện ra rằng việc sử dụng bản địa S3 tạo ra sự khác biệt lớn. Tôi chỉ thay đổi tiền tố URI thành s3n: // và hiệu suất cho công việc được đề cập đã giảm từ 2,5 phút xuống còn 21 giây. Vì vậy, chỉ có một hình phạt 3s để truy cập s3 vs hdfs, điều này khá hợp lý. Không chắc chắn nếu điều này sẽ áp dụng cho vấn đề của bạn mặc dù như các tài liệu tôi đã nhìn thấy cho đến nay cho thấy có một giới hạn kích thước tập tin của 5GB khi sử dụng s3 bản địa. –

Các vấn đề liên quan