TaskRunner在计算的过程中可能发生各种异常甚至错误,例如抓取Shuffle结果失败、任务被杀死、没权限向HDFS写入数据等,当TaskRunner的run方法运行时,可以通过try catch语句捕获这些异常,并通过调用CoarseGrainedExecutorBackend的statusUpdate方法向CoarseGrainedSchedulerBackend汇报。
下面是CoarseGrainedExecutorBackend的statusUpate方法源代码。
在statusUpdate方法中,通过方法的参数taskId、state和data构建一个StatusUpdate对象,并通过driverRef的send方法将该对象发送回CoarseGrainedSheduleBackend。CoarseG-rainedScheduleBackend匹配到StatusUpdate时,将根据StatusUpdate对象中的state值来对该Task的执行情况做出判断,并执行不同的处理逻辑。
TaskState是一个枚举变量,其中包括LAUNCHING、RUNNING、FINISHED、FAILED、KILLED和LOST这些枚举值。Executor根据任务执行的不同状态,通过statusUpdate方法返回特定的TaskState值,该值通过ExecutorBackend返回给SchedulerBackend,在Scheduler-Backend中根据TaskState中的值进行处理。
先来看一下TaskState.FAILED这种情况,在Executor的run方法中,如果发生Fetch-FailedExeception、CommitDeniedExeception或者其他Throwable的子类的异常,都将会返回TaskState.FAILED状态,该状态通过CoaseGainedExecutorBackend返回。在CoaseGaiend-SchedulerBackend中,匹配到StatusUpdate消息后将进行相应的处理,匹配代码如下。(www.xing528.com)
上面代码中,首先调用TaskSchedulerImpl的statusUpdate方法,该方法的调用用于更新taskId对应任务的状态。完成更新后,判断state状态是否为finished,若状态为finished,则从executorDataMap这个哈希表中取出executorId对应的ExecutorData对象,修改该对象中的freeCores。因为状态已经为finished状态,因此ExecutorData中的freeCores会增加CPUS_PER_TASK个,CPU_PER_TASK为每一个任务占用的CPU核的个数,该个数可以通过spark.task.cpus配置项进行配置。
更新完成ExecutorData上的可用CPU后,这些闲置的CPU通过makeOffers方法再次分配给其他任务使用。先来看一下makeOffers方法的源代码,如下所示。
每一个Executor上的资源发生变动时,都将调用makeOffers方法,该方法的作用是为等待执行的任务分配资源,并通过launchTasks方法将这些任务发送到这些Executor上运行。这些任务将被包装成TaskRunner对象,运行于Executor上的线程池中。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。