(1)在CoarseGrainedExecutorBackend的receiveWithLogging方法中,会收到CoarseG-rainedSchedulerBackend内部的DriverActor发送过来的LaunchTask消息,并调用Executor的launchTask方法进行处理。
(2)在Executor的launchTask方法中,首先会初始化一个TaskRunner来封装Task,TaskRunner管理Task运行时的所有细节。然后把TaskRunner对象放入到Java的threadPool(线程池)中去执行。
(3)打开TaskRunner类,我们可以发现它实现了Runnable接口。对于它所有的运行状况,我们可以在它的run()方法里查看。
1)在run()方法里,首先会进行Driver端发送过来的Task本身以及它所依赖的Jar等文件的反序列。
2)Task的运行是调用ask的run方法实现的,在task.run(taskId.toInt)的方法内部会继续调用Task的runTask方法,在这里,由于Task本身是个抽象类,具体的runTask方法是由它的两个子类ShuffleMapTask和RedultTask来实现的。
3)对于ShuffleMapTask而言,它的计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象,该对象中管理了ShuffleMapTask的运算结果存储到BlockManager里的相关存储信息,而不是计算结果本身,这些存储信息将会成为下一阶段的Task需要获得的输入数据时的依据。(www.xing528.com)
4)对于ResultTask的runTask方法而言,它最终返回的是func函数的计算结果。
(4)对于Executor的计算结果,最终是要返回给Driver的,这时,会根据结果的大小有不同的策略:
1)如果结果大于1GB,那么直接丢弃这个结果(这个是Spark1.2中新加的策略)。可以通过spark.driver.maxResultSize来进行设置。
2)这里的回传是直接通过Akka的消息传递机制。因此这个结果的大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过spark.akka.frameSize设置的,默认值是10MB。除此之外,还有200KB的预留空间。对于“较大”的结果(resultSize>=akkaF-rameSize-akkaUtils.reservedSizeBytes),将其以taskId为key存入org.apache.spark.storage. BlockManager。
3)其他的直接通过Akka回传到Driver。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。