(1)TaskRunner将Task的执行状态(StatusUpdate信息)汇报给Driver(负责接受并处理消息的是DriverActor的receiveWithLogging)后,Driver会转给org.apache.spark.scheduler. TaskSchedulerImpl的statusUpdate方法。
(2)进入TaskScheduulerImpl的statusUpdate方法中,这里不同的状态有不同的处理:
1)如果类型是TaskState.FINISHED,那么调用TaskResultGetter的enqueueSuccessfulTask方法进行处理。
2)如果类型是TaskState.FAILED、TaskState.KILLED或者TaskState.LOST,调用TaskResultGetter的enqueueFailedTask进行处理,对于TaskState.LOST,还需要将其所在的Executor标记为failed,并且根据更新后的Executor重新调度。
3)enqueueSuccessfulTask方法的逻辑也比较简单,就是如果是IndirectTaskResult,那么需要通过blockid来获取结果:sparkEnv.blockManager.getRemoteBytes(blockId);如果是Di-rectTaskResult,那么结果就无需远程获取了。(www.xing528.com)
4)然后调用连续调用TaskSchedulerImpl#handleSuccessfulTask->。
5)如果task是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游的Stage,以便于其可以作为下游Stage的输入。这个机制是怎么实现的?实际上,对于ShuffleMapTask来说,其结果实际上是org.apache.spark.scheduler.MapStatus;其序列化后存入了Direct-TaskResult或者IndirectTaskResult中。而DAGScheduler#handleTaskCompletion通过下面的方式来获取这个结果:
6)通过将这个status注册到org.apache.spark.MapOutputTrackerMaster,就完成了结果处理的全过程:
7)而registerMapOutputs的处理也很简单,以Shuffle ID为key将MapStatus的列表存入带有时间戳的HashMap:TimeStampedHashMap[Int,Array[MapStatus]]()。如果设置了cleanup的函数,那么这个HashMap会将超过一定时间(TTL,Time to Live)的数据清理掉。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。