2016-12-30 31 views
7

Theo tài liệu Spark chỉ các hành động RDD mới có thể kích hoạt công việc Spark và các phép biến đổi được đánh giá một cách lười biếng khi một hành động được gọi trên đó.Tại sao chuyển đổi sortBy kích hoạt công việc Spark?

Tôi thấy hàm chuyển đổi sortBy được áp dụng ngay lập tức và được hiển thị dưới dạng trình kích hoạt công việc trong SparkUI. Tại sao?

Trả lời

3

sortBy được triển khai sử dụng sortByKey phụ thuộc vào RangePartitioner (JVM) hoặc chức năng phân vùng (Python). Khi bạn gọi sortBy/sortByKey phân vùng (chức năng phân vùng) được khởi tạo háo hức và lấy mẫu RDD đầu vào để tính toán ranh giới phân vùng. Công việc bạn thấy tương ứng với quy trình này.

Việc phân loại thực tế chỉ được thực hiện nếu bạn thực hiện một hành động trên RDD hoặc hậu duệ của nó.

1

Theo tài liệu Spark chỉ hành động kích hoạt công việc trong Spark, các phép biến đổi được đánh giá một cách lười biếng khi một hành động được gọi trên đó.

Nói chung bạn nói đúng, nhưng như bạn vừa trải qua, có vài trường hợp ngoại lệ và sortBy là một trong số họ (với zipWithIndex).

Thực tế, nó đã được báo cáo trong JIRA của Spark và đóng với độ phân giải Không khắc phục. Xem SPARK-1021 sortByKey() launches a cluster job when it shouldn't.

Bạn có thể thấy việc chạy với DAGScheduler logging được kích hoạt (và sau này trong giao diện web):

scala> sc.parallelize(0 to 8).sortBy(identity) 
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions 
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25) 
INFO DAGScheduler: Parents of final stage: List() 
INFO DAGScheduler: Missing parents: List() 
DEBUG DAGScheduler: submitStage(ResultStage 1) 
DEBUG DAGScheduler: missing: List() 
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents 
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25) 
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4) 
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s 
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0 
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s 
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25 
Các vấn đề liên quan