2015-01-24 17 views
11

tôi đã đưa ra cụm của tôi theo cách này:Spark: chiến lược phân vùng lại sau khi đọc file văn bản

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar 

Điều đầu tiên tôi làm là đọc một tập tin văn bản lớn, và đếm nó:

val file = sc.textFile("/path/to/file.txt.gz") 
println(file.count()) 

Khi làm điều này, tôi thấy rằng chỉ một trong các nút của tôi thực sự đang đọc tệp và thực thi đếm (vì tôi chỉ thấy một tác vụ). Điều đó có được mong đợi không? Tôi có nên phân vùng lại RDD của mình sau đó hay khi tôi sử dụng các chức năng giảm bản đồ, liệu Spark có làm điều đó cho tôi không?

+0

"defaultMinPartitions" của bạn là gì? Theo doc rõ ràng nói, textFile có một số tham số phân vùng tùy chọn, mặc định là mặc định. –

+0

Mặc định của tôiMinPartitions lớn hơn một. Có vẻ như tôi không thể ép buộc một số phân vùng được chỉ định, bởi vì nó chỉ là một tệp văn bản ... đang chạy .... val tệp = sc.textFile ("/ path/to/file.txt.gz", 8) println (file.partitions.length) trả về 1 – Stephane

+0

Vâng, nó phải làm việc đọc ở một nơi, bởi vì đó là vốn đã nối tiếp. Nhưng tôi không thể thấy lý do tại sao nó sẽ có param tùy chọn nếu nó không làm _something_. –

Trả lời

20

Dường như bạn đang làm việc với tệp được nén.

Trích dẫn từ my answer here:

Tôi nghĩ rằng bạn đã trúng một vấn đề khá điển hình với các tập tin đã giải nén ở chỗ chúng không thể được nạp song song. Cụ thể hơn, một tệp gzip duy nhất không thể được nạp song song bởi nhiều tác vụ, vì vậy Spark sẽ tải nó với 1 tác vụ và do đó cung cấp cho bạn RDD với 1 phân vùng.

Bạn cần phân loại lại RDD một cách rõ ràng sau khi tải nó để nhiều tác vụ hơn có thể chạy song song.

Ví dụ:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3) 
println(file.count()) 

Về ý kiến ​​về câu hỏi của bạn, lý do thiết minPartitions không giúp ở đây là vì a gzipped file is not splittable, vì vậy Spark sẽ luôn luôn sử dụng 1 công việc để đọc các tập tin.

Nếu bạn đặt minPartitions khi đọc tệp văn bản thông thường hoặc tệp nén với định dạng nén có thể chia nhỏ như bzip2, bạn sẽ thấy Spark thực sự triển khai số lượng tác vụ đó một cách song song (tối đa số lõi có sẵn) trong cụm của bạn) để đọc tệp.

+0

Cảm ơn! Bạn có muốn giới thiệu bzip2 qua gzip sau đó không? Nếu tôi tải tập tin đó thường xuyên, chiến lược của tôi để tối ưu hóa mọi hoạt động là gì? – Stephane

+0

@Stephane - Nó phụ thuộc vào số lượng dữ liệu đang đến và khoảng thời gian cụm của bạn dành để phân vùng lại dữ liệu. Một tệp nén duy nhất có thể tốt. Nếu một tệp quá lớn, bạn có thể cũng có nhiều tệp được nén (tức là tách trước khi nén) vì mỗi tệp được nén có thể được nạp song song vào cùng một RDD (một nhiệm vụ cho mỗi tệp). Đó có lẽ là con đường kháng cự ít nhất. –

+0

cảm ơn rất rất thú vị! Vì vậy, .gz.001 các tệp được chia nhỏ hoặc bzip2 ... Tôi sẽ thử nghiệm với cả hai!Tôi nghĩ rằng có, nút cổ chai lớn là phân vùng đầu tiên, vì vậy nếu tôi quản lý đã chia nhỏ các tệp của tôi khi họ đến, nó có thể giúp tôi tiết kiệm một chút thời gian – Stephane

Các vấn đề liên quan