在Storage模块中,使用RPC框架进行通信。Storage模块对外提供了一个统一的交互类BlockManager。BlockManager在每一个结点(包括Driver端和Slave端)都有创建。Slave端创建的BlockManager将在initialize方法中向Driver端的BlockManagerMasterEndpoint发送Reg-isterBlockManager(blockManagerId,maxMemSize,slaveEndpoint)消息,收到消息后,完成Slave端BlockManager在Driver端BlockManager的注册。如图6-2所示。
图6-2 Storage模块通信层
Driver端的BlockManagerMaster拥有所有结点BlockManagerSlaveEndpoint的Ref及Block-ManagerMasterEndpoint的引用。Slave端的BlockManagerMaster拥有本结点上所有BlockMan-agerSlaveEndpoint的Ref。Driver和各结点通过BlockManagerMasterEndpoint和BlockManagerS-laveEndpoint通信。
在Driver和Slaves结点上的Executor中都有BlockManager。BlockManager提供了本地和远程不同存储类型(Disk、Memory、ExternalBlockStore)存储读取数据的接口。
首先来看一下BlockManager在Driver端的创建。在SparkContext创建时会根据具体的配置创建SparkEnv对象。源代码如下所示。
在createSparkEnv方法中传入SparkConf配置对象、isLocal标志及LiveListenerBus,方法中使用SparkEnv对象的createDriverEnv方法创建SparkEnv并返回。在SparkEnv的creat-eDriverEvn方法中,将会创建BlockManager、BlockManagerMaster等对象,完成Storage在Driver端的部署。
SparkEnv中创建BlockManager和BlockManagerMaster的关键源代码如下所示。
上面代码的第2行使用new关键字实例化出BlockManagerMaster,并在代码第8行传入BlockManager的构造函数,实例化出BlockManager对象。这里的BlockManagerMaster和BlockManager属于聚合关系。BlockManager主要对外提供统一的访问接口,BlockManager-Master主要对内提供各结点之间的指令通信服务。
BlockManagerMaster在Driver端和Executors中的创建稍有差别。首先来看在Driver端创建的情形。创建BlockManagerMaster传入的isDriver参数,isDriver为true表示在Driver端创建,否则视为在Slave结点上创建。
当SparkContext中执行_env.blockManager.initialize(_applicationId)代码时,会调用Driver端BlockManager的initialize方法。initialize方法的源代码如下所示。(www.xing528.com)
如上面的源代码所示,initialize方法使用appId初始化BlockManager。主要完成以下几项任务。
1)初始化BlockTransferService。
2)初始化ShuffleClient。
3)创建BlockManagerId。
4)将BlockManager注册到BlockManagerMaster上。
5)若ShuffleService可用,注册ShuffleService。
在BlockManager的initialize方法上右击Find Usages,可以看到initialize方法在两个地方得到调用,一个是SparkContext,另一个是Executor。在启动Executor时,会调用BlockMan-ager的initialize方法。Executor中调用initialize方法的源代码如下所示。
上面代码中,调用了env.blockManager.initialize方法。在initialize方法中,完成Block-Manger向Master端BlockManagerMaster的注册。使用方法master.registerBlockManager(block-ManagerId,maxMemory,slaveEndpoint)完成注册,registerBlockManager方法中传入blockMan-agerId、maxMemory和salveEndPoint引用,分别表示Executor中的BlockManager、最大内存和BlockManger中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一个RPC端点,通过它完成与BlockManagerMaster的通信。BlockManager收到注册请求后将Executor中注册的BlockManagerInfo存入哈希表中,以便通过BlockManagerSlaveEndpoint向Executor发送控制命令。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。