2016-05-30 25 views
51

Giả sử sau đây chỉ có một công việc Spark đang chạy tại mọi thời điểm.Các giai đoạn được chia thành các nhiệm vụ trong Spark như thế nào?

Những gì tôi có được cho đến nay

Đây là những gì tôi hiểu những gì xảy ra trong Spark:

  1. Khi một SparkContext được tạo ra, mỗi nút công nhân bắt đầu một chấp hành viên. Executors là các quá trình riêng biệt (JVM), kết nối trở lại chương trình điều khiển. Mỗi người thi hành có bình của chương trình điều khiển. Bỏ lái xe, đóng cửa các nhà điều hành. Mỗi người thi hành có thể chứa một số phân vùng.
  2. Khi công việc được thực hiện, kế hoạch thực hiện được tạo theo biểu đồ đường truyền.
  3. Công việc thực hiện được chia thành các giai đoạn, trong đó các giai đoạn chứa nhiều chuyển đổi và hành động lân cận (trong biểu đồ đường truyền) và hành động, nhưng không có xáo trộn. Vì vậy, các giai đoạn được phân cách bởi các xáo trộn.

image 1

Tôi hiểu rằng

  • Một tác vụ là một lệnh được gửi từ người lái xe đến một chấp hành viên của serializing đối tượng Function.
  • Người thực thi deserializes (với các trình điều khiển jar) lệnh (nhiệm vụ) và thực hiện nó trên một phân vùng.

nhưng

Câu hỏi (s)

Làm thế nào để chia giai đoạn thành những nhiệm vụ?

Cụ thể:

  1. Are nhiệm vụ xác định bởi biến đổi và hành động hoặc có thể có nhiều biến đổi/hành động được trong một nhiệm vụ?
  2. Các tác vụ được xác định bởi phân vùng (ví dụ: một nhiệm vụ cho mỗi giai đoạn trên mỗi phân vùng).
  3. Các tác vụ được xác định bởi các nút (ví dụ: một nhiệm vụ trên mỗi giai đoạn trên mỗi nút)?

Những gì tôi nghĩ (chỉ trả lời một phần, ngay cả khi phải)

Trong https://0x0fff.com/spark-architecture-shuffle, shuffle được giải thích với hình ảnh

enter image description here

và tôi có ấn tượng rằng sự cai trị là

mỗi giai đoạn được chia thành các tác vụ # số phân vùng, không liên quan đến số lượng các nút

Đối với hình ảnh đầu tiên của tôi, tôi muốn có 3 nhiệm vụ bản đồ và 3 tác vụ giảm.

Đối với hình ảnh từ 0x0fff, tôi muốn nói có 8 tác vụ bản đồ và 3 tác vụ giảm (giả sử chỉ có ba tệp màu cam và ba tệp màu lục đậm).

Câu hỏi mở trong mọi trường hợp

Có đúng không? Nhưng ngay cả khi điều đó là chính xác, các câu hỏi trên của tôi không được trả lời, vì câu hỏi vẫn mở, cho dù nhiều hoạt động (ví dụ: nhiều bản đồ) nằm trong một tác vụ hoặc được tách thành một nhiệm vụ cho mỗi thao tác.

gì người khác nói

What is a task in Spark? How does the Spark worker execute the jar file?How does the Apache Spark scheduler split files into tasks? tương tự, nhưng tôi không cảm thấy rằng câu hỏi của tôi đã được trả lời rõ ràng ở đó.

Trả lời

14

Bạn có một phác thảo khá đẹp ở đây. Để trả lời câu hỏi của bạn

  • Một riêng taskkhông cần phải được đưa ra cho mỗi phân vùng dữ liệu cho mỗi stage. Hãy xem xét rằng mỗi phân vùng có thể sẽ nằm trên các vị trí thực tế riêng biệt - ví dụ: chặn trong HDFS hoặc thư mục/tập cho hệ thống tệp cục bộ.

Lưu ý rằng việc gửi Stage s được điều khiển bởi DAG Scheduler. Điều này có nghĩa là các giai đoạn không phụ thuộc lẫn nhau có thể được gửi tới cụm để thực hiện song song: điều này tối đa hóa khả năng song song trên cụm. Vì vậy, nếu hoạt động trong dataflow của chúng tôi có thể xảy ra đồng thời, chúng tôi sẽ mong đợi để xem nhiều giai đoạn đưa ra.

Chúng ta có thể thấy rằng trong hành động trong ví dụ đồ chơi sau đây mà chúng ta làm các loại sau đây của các hoạt động:

  • tải hai datasources
  • thực hiện một số hoạt động bản đồ trên cả hai trong những nguồn dữ liệu riêng
  • tham gia cùng họ
  • thực hiện một số bản đồ và bộ lọc hoạt động trên kết quả
  • lưu kết quả

Vậy thì chúng tôi sẽ kết thúc bao nhiêu giai đoạn?

  • 1 sân khấu cho mỗi tải hai datasources song song = 2 giai đoạn
  • Một giai đoạn thứ ba đại diện cho join đó là phụ thuộc trên hai giai đoạn khác
  • Lưu ý: tất cả các hoạt động tiếp theo trên làm việc trên dữ liệu đã tham gia có thể được thực hiện trong cùng một giai đoạn bởi vì chúng phải xảy ra tuần tự. Không có lợi ích khi tung ra các giai đoạn bổ sung bởi vì họ không thể bắt đầu làm việc cho đến khi hoàn thành hoạt động trước đó.

Dưới đây là chương trình đồ chơi

val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) } 
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }} 
val spj = sfi.join(sp) 
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }} 
val sf = sm.filter{ case (k,v) => v % 10 == 0 } 
sf.saveAsTextFile("/data/blah/out") 

Và đây là DAG của kết quả

enter image description here

Bây giờ là: bao nhiêu nhiệm vụ? Số lượng các nhiệm vụ cần được bình đẳng để

Sum của (Stage * #Partitions in the stage)

Trong trường hợp của tôi #Partitions in the stage bằng number of processors trên máy cụm tôi.

+0

Cảm ơn! Hãy xây dựng câu trả lời của bạn liên quan đến văn bản của tôi: 1) Định nghĩa của tôi về các giai đoạn không toàn diện? Có vẻ như tôi đã bỏ lỡ yêu cầu rằng một giai đoạn không thể chứa các hoạt động có thể song song. Hay là mô tả của tôi có ngụ ý nghiêm chỉnh điều đó? 2) Số nhiệm vụ phải được thực hiện cho công việc được xác định bởi số lượng phân vùng, nhưng không phải số lượng bộ xử lý hoặc nút, trong khi số lượng tác vụ có thể được thực thi cùng một lúc phụ thuộc vào số lượng bộ vi xử lý, phải không? 3) Một tác vụ có thể chứa nhiều thao tác? – Make42

+0

4) Bạn có ý gì với câu cuối cùng của bạn? Sau khi tất cả, các phân vùng số có thể thay đổi từ giai đoạn này sang giai đoạn khác. Ý bạn là đây là cách bạn cấu hình công việc của mình cho tất cả các giai đoạn? – Make42

+0

@ Make42 Tất nhiên số lượng phân vùng có thể thay đổi theo từng giai đoạn - bạn chính xác. Đó là ý định của tôi bằng cách nói 'sum (..)' để xem xét sự thay đổi đó. – javadba

7

Nếu tôi hiểu chính xác có 2 điều (liên quan) gây nhầm lẫn cho bạn:

1) Điều gì xác định nội dung của tác vụ?

2) Điều gì xác định số nhiệm vụ cần thực hiện?

động cơ "keo" Spark cùng đơn giản hoạt động trên rdds liên tiếp, ví dụ:

rdd1 = sc.textFile(...) 
rdd2 = rdd1.filter(...) 
rdd3 = rdd2.map(...) 
rdd3RowCount = rdd3.count 

nên khi rdd3 là (uể oải) tính toán, tia lửa sẽ tạo ra một nhiệm vụ cho mỗi phân vùng của rdd1 và mỗi công việc sẽ thực thi cả bộ lọc và bản đồ trên mỗi dòng để tạo ra rdd3.

Số lượng tác vụ được xác định theo số lượng phân vùng. Mỗi RDD có một số phân vùng được xác định. Đối với RDD nguồn được đọc từ HDFS (sử dụng sc.textFile (...) chẳng hạn) số lượng phân vùng là số lượng phân tách được tạo bởi định dạng đầu vào. Một số thao tác trên RDD (s) có thể dẫn đến một RDD với một số phân vùng khác:

rdd2 = rdd1.repartition(1000) will result in rdd2 having 1000 partitions (regardless of how many partitions rdd1 had). 

Một ví dụ khác là gia nhập:

rdd3 = rdd1.join(rdd2 , numPartitions = 1000) will result in rdd3 having 1000 partitions (regardless of partitions number of rdd1 and rdd2). 

(Hầu hết các) hoạt động mà thay đổi số lượng phân vùng liên quan đến một shuffle, Khi chúng ta làm ví dụ:

rdd2 = rdd1.repartition(1000) 

những gì thực sự xảy ra là nhiệm vụ trên mỗi phân vùng của rdd1 cần để tạo ra một kết thúc đầu ra có thể được đọc bởi các giai đoạn sau đây để làm rdd2 h ave chính xác 1000 phân vùng (Làm thế nào họ làm điều đó? Hash hoặc Sort). Các tác vụ ở phía bên này đôi khi được gọi là "Tác vụ bên bản đồ". Một nhiệm vụ mà sau này sẽ chạy trên rdd2 sẽ hành động trên một phân vùng (của rdd2!) Và sẽ phải tìm ra cách đọc/kết hợp các đầu ra phía bản đồ có liên quan đến phân vùng đó. Các tác vụ ở bên này đôi khi được gọi là "Tác vụ giảm (bên)".

Hai câu hỏi có liên quan: số lượng nhiệm vụ trong một giai đoạn là số phân vùng (chung với các tỷ lệ liên tiếp "dán" với nhau) và số lượng phân đoạn của rdd có thể thay đổi giữa các giai đoạn (bằng cách chỉ định số của phân vùng để một số shuffle gây ra hoạt động ví dụ).

Sau khi thực hiện một giai đoạn bắt đầu, nhiệm vụ của nó có thể chiếm các vùng nhiệm vụ.Số lượng các vị trí nhiệm vụ đồng thời là numExecutors * ExecutorCores. Nói chung, chúng có thể bị chiếm đóng bởi các tác vụ từ các giai đoạn khác nhau, không phụ thuộc.

5

Điều này có thể giúp bạn hiểu rõ hơn về mảnh khác nhau:

  • Stage: là một tập hợp các nhiệm vụ. Cùng một quá trình chạy với các tập con dữ liệu (phân vùng) khác nhau.
  • Tác vụ: đại diện cho đơn vị hoạt động trên phân vùng của tập dữ liệu được phân phối. Vì vậy, trong mỗi giai đoạn, số lượng nhiệm vụ = số phân vùng hoặc như bạn đã nói "một nhiệm vụ trên mỗi phân đoạn mỗi phân vùng"
  • Mỗi người thực hiện chạy trên một vùng chứa sợi và mỗi container nằm trên một node.
  • mỗi giai đoạn sử dụng nhiều executers, mỗi executer được phân bổ nhiều vcores.
  • mỗi vcore có thể thực hiện chính xác một nhiệm vụ tại một thời điểm
  • Vì vậy, ở bất kỳ giai đoạn, nhiều nhiệm vụ có thể được thực hiện song song. số-of -task đang chạy = số-of-vcores đang được sử dụng.
+1

Đây là một bài đọc hữu ích về kiến ​​trúc tia lửa: https://0x0fff.com/spark-architecture/ –

Các vấn đề liên quan