在5.1.2节中,了解了参数的传递过程和外部配置的参数,通过结点之间的消息传递,最终作用在Executor上,外部配置的参数终将会在Executor上发挥作用。在本节中,主要探讨Executor的启动过程。
再回到SparkDeploySchedulerBackend上来,在SparkDeploySchedulerBackend中会新建AppClient,AppClient启动后会向Master发送RegisterApplication消息,Master在收到Regis-terApplication请求后,将会调用schedule方法,在调度过程中会使用allocateWorkerResource-ToExecutors方法为Worker结点上的Executor分配资源。分好资源后,调用launchExecutor(worker,exec)方法,此方法将向Worker发送LaunchExecutor请求,启动Executor。下面是launchExecutor方法的源代码。
launchExecutor方法中会向Worker发送LaunchExecutor消息,先来看一看LaunchExecutor消息的几个参数,masterUrl是集群中master的Url,exec.application.id是Master为Applica-tion分配的id号,该id号在注册Application时就已经生成好了。exec.id是Executor的id号,Master在为Application分配Worker时该id号就已经指定好。exec.application.desc是ap-plication的描述,包括name、maxCores、memoryPerExecutorMB、command、appUiUrl和even-tLogDir等信息。exec.cores、exec.memory分别代表Executor运行核的个数和运行内存。
Worker收到LaunchExecutor消息后,首先启动ExecutorRunner,然后向Master发送Exec-utorStateChanged(appId,execId,manager.state,None,None)消息,Master在收到此消息后将视Executor的状态调用schedule()方法,重新调整集群资源。
ExecutorRunner启动后,将调用fetchAndRunExecutor方法创建Executor工作目录并启动CoarseGraindExecutorBancked进程,fetchAndRunExecutor方法的源代码如下所示。
在上面代码中的第4行,使用CommandUtils工具构建了一个ProcessBuilder,构建Process-Builder时传入了appDesc.command、memory、sparkHome.getAbsolutePath和substituteVariables这几个参数,分别代表命令、内存大小、SparkHome绝对路径和替代参数。appDesc.command命令对象里面包括mainClass、Arguments、Environment、classPathEntries、libraryPathEntries和javaOpts这几项内容。command中包含了启动进程的关键配置项,其中mainClass代表的就是启动进程的名称,Standalone模式下mainClass指代的是“org.apache.spark.executor.CoarseGrainedExecutor-Backend”。Command的源代码如下所示。
这些参数在构建提交环境时就已经准备好,通过ApplicationDescription传递过来。上面提到的替代参数是什么意思呢?这还得从部署谈起。在Standalone模式下,TaskScheduler启动的SchedulerBackend是SparkDeploySchedulerBackend。SparkDeploySchedulerBackend会预先构建Executor的一些参数,在构建这些参数时,这些参数的值还没法获取,因此使用了替代参数,在这些参数的值获取了之后再替代。在SparkDeloySchedulerBackend中有以下这些事先构建好的替代参数,源代码如下所示。
上面代码中,构建了一个名为args的序列。该序列中包括driver-url、executor-id、host-name、cores、app-id和worker-url这几个参数,并指定使用{{name}}作为占位符,暂时代替这些还没有具体值的参数。这些参数将会在org.apache.spark.deploy.worker.ExecutorRunner启动CoarseGrainedExecutorBackend时被替换成真实的值。替换源代码如下所示。(www.xing528.com)
再回过头来看一下前面fetchAndRunExecutor方法中的第22行代码,process=build-er.start(),这句代码调用Java的ProcessBuilder的start方法,依据传入的启动参数启动进程。在使用CommondUtils的buildProcessBuilder方法创建builder时,传入了启动进程的命令,该命令已经在SparkDeploySchedulerBackend中构建好,源代码如下所示。
从上面代码中可见,在构建Command时,传给Command的第一个参数是org.apache. spark.executor.CoarseGrainedExecutorBackend,该参数表示mainClass。因此builder.start启动的进程就是org.apache.spark.executor.CoarseGrainedExecutorBackend进程,并且在启动中指定了进程所占处理内核的个数、内存及JVM配置信息。
在CoarseGrainedExecutorBackend进程启动后,将会向Driver端发起注册请求。之所以要向Driver注册,是因为实际控制Executor计算任务的还是Driver,Master只是间接地为Driver分配了Executor,分配好了之后,使用权便交到Driver手中。Driver要知道Master都为自己分配了哪些Executor,这些Executor都位于哪些Worker中,因此Executor在Worker上启动成功后,主动去联系它的“主人”,向Driver注册,以免“主人”长时间等待Master分配的资源。RegisterExecutor消息在CoarseGrainedExecutorBackend的onStart方法中发出,源代码如下所示。
上面代码中的第3行使用rpcEnv得到Driver的一个端点引用,然后使用ask方法向Driver发出RegisterExecutor(executorId,self,hostPort,cores,extractLogUrls)请求,Driver端收到请求后完成Executor的注册,并返回RegisteredExecutor消息,CoarseGrainedExecutorBack-end的receive方法收到RegisteredExecutor消息后,会立即创建一个Executor执行器。源代码如下所示。
上面代码中,匹配到RegisterdExecutor消息,证明CoarseGraindedExecutorBackend在Driver端已经注册,并打印出注册成功的信息,然后后创建Executor。Executor将协助Coar-seGraindedExecutorBackend完成计算任务。
Executor在启动和执行计算任务过程中都有可能遇到错误或者异常,下一节将讲解在Executor中是如何进行异常处理的。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。