首页 理论教育 深入解析Akka驱动下的start-all.sh源码

深入解析Akka驱动下的start-all.sh源码

时间:2023-06-20 理论教育 版权反馈
【摘要】:在Spark中各个模块之间的消息传递采用的就是Akka框架,很多组件封装为Actor,进行控制和状态通信。下面我们以Spark集群启动的“start-all.sh”命令为例来解析一下Ak-ka驱动下Master结点和Worker结点之间的通信。我们首先在Spark的sbin目录下用vim打开“start-all.sh”脚本。我们先用vim打开“start-master.sh”脚本查看里面的内容。打开start-slaves.sh脚本,会看到它会继续调用start-slave.sh脚本来启动Worker结点。对于Akka,Spark并没有充分挖掘它强大的并行处理能力,只是将其作为分布式系统中的RPC框架。

深入解析Akka驱动下的start-all.sh源码

在Spark中各个模块之间的消息传递采用的就是Akka框架,很多组件封装为Actor,进行控制和状态通信。下面我们以Spark集群启动的“start-all.sh”命令为例来解析一下Ak-ka驱动下Master结点和Worker结点之间的通信。

(1)我们首先在Spark的sbin目录下用vim打开“start-all.sh”脚本。可以看到在文件里会继续调用“start-master.sh”脚本和“start-slaves.sh”脚本。我们先用vim打开“start-master.sh”脚本查看里面的内容。

(2)可以看到Start-master.sh文件里会用“spark-daemon.sh”脚本开启一个org. apache.spark.deploy.master.Master进程。

(3)打开start-slaves.sh脚本,会看到它会继续调用start-slave.sh脚本来启动Worker结点。

(4)打开start-slave.sh文件,我们会看到spark-daemon.sh脚本会开启Worker进程这里的Worker对应的都是在$SPARK_HOME/conf/slaves文件中配置的Worker结点。

(5)到这里,我们可以进入Spark的源码来分析Master结点和Worker结点之间的通信。打开Master类的伴生对象,在该伴生对象的main()方法里,会先根据传进来的参数初始化MasterArguments对象。接下来调用startSystemAndActor()方法来开启一个actor。

在startSystemAndActor()方法中会调用AkkaUtil工具类创建一个ActorSystem,通过ac-torSystem.actorOf()方法开启Master,并返回Master的ref(Mater这个actor的引用)。

(6)当Master进程开启,也即Master对象实例化后,首先要执行preStart()这个生命周期方法进行一系列的Master配置。在preStart方法中,我们这里重点关注context.system.schedu-ler.schedule()这个方法,其中的参数WORK_TIMEOUT的默认值是60 s,self指的是Master自身,CheckForWorkerTimeOut是个样例类。

我们会看到在Scheduler特质的schedule()方法里,有条发送消息的语句(即代码中的语句receiver!message中message所指代的CheckForWorkerTimeOut),其中schedule()方法的参数receiver的实参是self,参数message的实参是CheckForWorkerTimeOut。在这里意味着每隔60 s,Master都会重复向自己的邮箱发送消息,来检查已经注册成功的Worker结点是否在规定时间内Master发送心跳。

(7)下面我们打开Master负责接收并处理消息的receiverWithLogging()方法。在它的模式匹配选项中找到CheckForWorkerTimeOut。接着会调用timeOutDeadWorkers()方法。(www.xing528.com)

我们继续跟踪timeOutDeadWorkers()方法,在timeOutDeadWorkers()方法中,会过滤掉心跳发送超时的Worker。

(8)下面我们进入Worker进程,Worker进程的开启步骤和Master进程的步骤是一样的,这里不做详解,我们直接进入Worker类的preStart()方法。接着查看preStart()方法里面的registerWithMaster()方法,preStart()方法的源码实现如下:

在registerWithMaster()方法里会继续调用tryRegisterAllMaster()方法向Master注册Worker。

在tryRegisterAllMaster()方法里会向Master的引用actor发送RegisterWorker的消息来申请Worker的注册。

(9)在Master的邮箱里(即receiveWithLogging()方法)收到此消息(即一个样例类RegisterWorker消息)会进行处理,如果注册成功,Master结点会向Worker结点发送Regis-teredWorker消息。

(10)当Worker结点收到RegisteredWorker消息后,会向自己的邮箱(也就是Worker类的receiveWithLogging方法)周期性地发送WorkDirCleanup消息来清理Worker结点上的Spark应用目录下的内容。

在Worker类的receiveWithLogging中,找到case WorkDirCleanup选项的匹配后,接下来会启动一个独立线程去清理旧应用的目录和文件,具体的操作源码如下:

到这里,我们通过Master结点和Worker结点之间发送的一些消息的流程讲解了Akka在Spark中的应用,对于receiveWithLogging()方法中的模式匹配选项,还有很多没提及,读者可通过源码进行了解。对于Akka,Spark并没有充分挖掘它强大的并行处理能力,只是将其作为分布式系统中的RPC框架。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈