Storage模块主要分为两层:
通信层:Storage模块采用的是master-slave结构来实现通信层,master和slave之间传输控制信息、状态信息,这些都是通过通信层来实现的。
存储层:Storage模块需要把数据存储到disk或是memory上面,有可能还需replicate到远端,这都是由存储层来实现和提供相应接口。
而其他模块若要和Storage模块进行交互,Storage模块提供了统一的操作类BlockManag-er,外部类与Storage模块打交道都需要通过调用BlockManager相应接口来实现。
1.Storage模块通信层
(1)首先我们看一下在Master-Slave架构下的各个Storage模块之间的通信交互图(如图5-4所示)。
图5-4 Storage模块之间的通信交互图
对于master结点和slave结点而言,BlockManager的创建有所不同:
Master(client driver)结点的BlockManagerMaster拥有BlockManagerMasterActor的actor和所有BlockManagerSlaveActor的ref。
Slave(executor)结点的BlockManagerMaster则拥有BlockManagerMasterActor的ref和自身BlockManagerSlaveActor的actor。
BlockManagerMasterActor在ref和actor之间进行通信;BlockManagerSlaveActor在ref和actor之间通信。BlockManager wrap(封装)了BlockManagerMaster,通过BlockManagerMaster进行通信。Spark会在client driver和executor端创建各自的BlockManager,通过BlockManager对storage模块进行操作。
这里再解释一下actor和ref的含义:actor和ref是Akka中的两个不同的actor reference(角色引用),分别由actorOf和actorFor所创建。actor类似于网络服务中的server端,它保存所有的状态信息,接收client端的请求执行并返回给客户端;ref类似于网络服务中的cli-ent端,通过向server端发起请求获取结果。
(2)BlockManager对象在SparkEnv类的create方法中被创建,创建的过程如下所示。
可以看到对于client driver和executor,Spark分别创建了BlockManagerMasterActor的ac-tor和ref,并被wrap到BlockManager中。
(3)通信层传递的消息。
1)BlockManagerMasterActor在ref和actor之间的通信。
(www.xing528.com)
2)BlockManagerSlaveActor在ref和actor之间通信。
(4)前面已经介绍了BlockManager对象是如何被创建出来的,当BlockManager被创建出来以后需要向client driver注册自己,下面我们来看一下这个流程。
1)首先BlockManager会调用initialize()方法初始化自己。在initialized()方法中首先调用BlockManagerMaster向client driver注册自身,可以看到在注册自身的时候向client driver传递了自身的slaveActor,client driver收到slaveActor以后会将其与之对应的BlockManagerInfo存储到hash map中,以便后续通过slaveActor向executor发送命令。
2)BlockManagerMaster会将注册请求包装成RegisterBlockManager报文发送给client driv-er的BlockManagerMasterActor。
3)Client Driver端的BlockManagerMasterActor调用register()方法注册BlockManager。
需要注意的是在Client Driver端也会执行上述过程,只是在最后注册的时候如果判断是"<driver>"就不进行任何操作。在上述代码的倒数第5行可以看到对应的BlockMana-gerInfo对象被创建并保存在hash map(哈希表)中。
2.Storage模块存储层
在RDD层面上我们了解到RDD是由不同的partition组成的,我们所进行的transforma-tion操作和action操作是在partition上面进行的;而在Storage模块内部,RDD又被视为由不同的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不同。在Spark storage模块中存取数据的最小单位是block,所有的操作都是以block为单位进行的。
(1)BlockManager对象被创建的时候会创建出MemoryStore和DiskStore对象用以存取block块。
(2)同时在BlockManager的initialize()方法中启动BlockManagerWorker对象的Actor用以监听远程的block存取请求来进行相应处理。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。