Spark中的各个组件是通过脚本来启动部署的,为了解密Worker的部署,以启动Worker的脚本为切入点开始分析。
部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式来启动集群中在该文件中列出的全部结点上的Worker实例。启动组件的命令如下。
另一种方式是动态地在某个新增结点上(注意:是新增结点,如果之前已经部署过的话,可以参考后面对启动多个实例的进一步分析)启动一个Worker实例,此时可以在该新增的结点上执行如下启动命令。
其中,参数MasterURL表示当前集群中Master的监听地址,启动后Worker会通过该地址动态注册到Master组件,从而实现为集群动态添加Worker结点的目的。
1.Worker部署脚本的解析
部署脚本根据单个结点及多个结点的Worker部署,对应有两个脚本:start-slave.sh和start-slaves.sh。其中start-slave.sh负责在脚本执行结点启动一个Worker组件,start-slaves.sh脚本则会读取配置的conf/slaves文件,逐个启动集群中各个Slave结点上的Worker组件。
(1)首先分析脚本start-slaves.sh
脚本start-slaves.sh提供了批量启动集群中各个Slave结点上的Worker组件的方法。即,可以在配置好Slave结点(即配置好conf/slaves文件)之后,通过该脚本一次性全部启动集群中的Worker组件。
脚本的代码如下。
其中,脚本slaves.sh通过SSH协议在指定的各个Slave结点上执行各种命令,代码比较简单,建议大家自行查看。
在ssh启动的start-slave.sh命令中,可以看到它的参数是"spark://$SPARK_MASTER IP:$SPARK_MASTER_PORT",这实际上就是Master URL的值的拼接代码。
(2)继续分析脚本start-slave.sh
从前面start-slaves.sh脚本的分析可以看到,最终是在各个Slave结点上执行start-slave.sh脚本来部署Worker组件的,相应的,就可以通过该脚本动态地为集群添加新的Worker组件。
脚本的代码如下。
在手动启动Worker实例时,如果需要在一个结点上部署多个Worker组件,则需要配置SPARK WORKER INSTANCES环境变量,否则多次启动脚本部署Worker组件时会报错,其原因在于spark-daemon.sh脚本的执行控制,这里给出关键代码的简单分析。
首先脚本中有实例是否已经运行的判断,代码如下。
其中,记录对应实例的PID的文件相关代码如下。
从上面的分析可以看出,如果不是通过设置SPARK_WORKER_INSTANCES,然后一次性启动多个Worker实例,而是手动一个个地启动的话,对应的,在脚本中每次启动时的实例编号都是1,在后台守护进程的spark-daemon.sh脚本中生成的pid就是同一个文件,因此第二次启动时,pid文件已经存在,此时就会报错(对应停止时也是通过读取pid文件获取进程ID的,因此自动停止多个实例的话,也需要设置SPARK_WORKER_INSTANCES)。(www.xing528.com)
2.Worker的源代码解析
首先查看Worker伴生对象中的main方法,代码如下。
可以看到,Worker伴生对象中的main方法的格式和Master基本一致。通过参数的类型WorkerArguments来解析命令行参数,具体的代码解析可以参考Master结点部署时的Master-Arguments的代码解析。
另外,MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。
在解析完Worker的参数之后,调用startRpcEnvAndEndpoint方法启动RPC通信环境及Worker的RPC通信终端。该方法的代码解析可以参考Master结点部署时使用的同名方法的代码解析。
最终会实例化一个Worker对象。Worker也是继承ThreadSafeRpcEndpoint,对应的也是一个RPC的通信终端,实例化该对象后会调用onStart方法,该方法的代码如下。
其中,createWorkDir()方法对应构建了该Worker结点上的工作目录,后续在该结点上执行的Application相关信息都会存放在该目录下。代码如下。
可以看到如果没有设置workDirPath,默认使用的是sparkHome目录下的work子目录。对应的workDirPath在Worker实例化时传入,反推代码可以查到该变量在WorkerArguments中设置。相关代码有两处,一处在WorkerArguments的主构造体中,代码如下。
即workDirPath由环境变量"SPARK_WORKER_DIR"设置。
另外一处在命令行选项解析时设置,代码如下。
即workDirPath由启动Worker实例时传入的可选项"--work-dir"设置。
属性配置:通常由命令可选项来动态设置启动时的配置属性,此时配置的优先级高于默认的属性文件及环境变量中设置的属性。
启动Worker后的一个关键步骤就是注册到Master,对应的方法registerWithMaster()的代码如下。
继续查看tryRegisterAllMasters()方法,代码如下。
其中registerWithMaster(masterEndpoint)向特定Master的RPC通信终端发送消息,并且在接收到反馈消息后,进一步调用handleRegisterResponse方法进行处理。对应的处理代码如下。
分析到这一步,已经明确了注册及对注册的反馈信息的处理细节,下面来进一步分析注册重试定时器的相关处理,注册重试定时器会定期向Worker本身发送ReregisterWithMaster消息,因此可以在receive方法中查看该消息的处理,具体代码如下。
最终会调用registerWithMaster方法。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。