首页 理论教育 Executor运行Task并返回结果

Executor运行Task并返回结果

时间:2023-06-20 理论教育 版权反馈
【摘要】:在Executor的launchTask方法中,首先会初始化一个TaskRunner来封装Task,TaskRunner管理Task运行时的所有细节。2)Task的运行是调用ask的run方法实现的,在task.run的方法内部会继续调用Task的runTask方法,在这里,由于Task本身是个抽象类,具体的runTask方法是由它的两个子类ShuffleMapTask和RedultTask来实现的。4)对于ResultTask的runTask方法而言,它最终返回的是func函数的计算结果。对于Executor的计算结果,最终是要返回给Driver的,这时,会根据结果的大小有不同的策略:1)如果结果大于1GB,那么直接丢弃这个结果。

Executor运行Task并返回结果

(1)在CoarseGrainedExecutorBackend的receiveWithLogging方法中,会收到CoarseG-rainedSchedulerBackend内部的DriverActor发送过来的LaunchTask消息,并调用Executor的launchTask方法进行处理。

(2)在Executor的launchTask方法中,首先会初始化一个TaskRunner来封装Task,TaskRunner管理Task运行时的所有细节。然后把TaskRunner对象放入到Java的threadPool(线程池)中去执行。

(3)打开TaskRunner类,我们可以发现它实现了Runnable接口。对于它所有的运行状况,我们可以在它的run()方法里查看。

1)在run()方法里,首先会进行Driver端发送过来的Task本身以及它所依赖的Jar等文件的反序列。

2)Task的运行是调用ask的run方法实现的,在task.run(taskId.toInt)的方法内部会继续调用Task的runTask方法,在这里,由于Task本身是个抽象类,具体的runTask方法是由它的两个子类ShuffleMapTask和RedultTask来实现的。

3)对于ShuffleMapTask而言,它的计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象,该对象中管理了ShuffleMapTask的运算结果存储到BlockManager里的相关存储信息,而不是计算结果本身,这些存储信息将会成为下一阶段的Task需要获得的输入数据时的依据。(www.xing528.com)

4)对于ResultTask的runTask方法而言,它最终返回的是func函数的计算结果。

(4)对于Executor的计算结果,最终是要返回给Driver的,这时,会根据结果的大小有不同的策略:

1)如果结果大于1GB,那么直接丢弃这个结果(这个是Spark1.2中新加的策略)。可以通过spark.driver.maxResultSize来进行设置。

2)这里的回传是直接通过Akka的消息传递机制。因此这个结果的大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过spark.akka.frameSize设置的,默认值是10MB。除此之外,还有200KB的预留空间。对于“较大”的结果(resultSize>=akkaF-rameSize-akkaUtils.reservedSizeBytes),将其以taskId为key存入org.apache.spark.storage. BlockManager。

3)其他的直接通过Akka回传到Driver。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈