Theo tài liệu Spark chỉ hành động kích hoạt công việc trong Spark, các phép biến đổi được đánh giá một cách lười biếng khi một hành động được gọi trên đó.
Nói chung bạn nói đúng, nhưng như bạn vừa trải qua, có vài trường hợp ngoại lệ và sortBy
là một trong số họ (với zipWithIndex
).
Thực tế, nó đã được báo cáo trong JIRA của Spark và đóng với độ phân giải Không khắc phục. Xem SPARK-1021 sortByKey() launches a cluster job when it shouldn't.
Bạn có thể thấy việc chạy với DAGScheduler
logging được kích hoạt (và sau này trong giao diện web):
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25
Nguồn
2016-12-31 10:51:44