在上一小节中,讲解了BlockStore的实现之一ExternalBlockStore,在ExternalBlockStore中通过使用ExternalBlockManager来操作Spark内存中的数据,可以看到ExternalBlockManag-er是一个抽象类,目前该类的唯一实现是TachyonBlockManager,包路径是org.apache. spark.storage.TachyonBlockManager。在TachyonBlockManager中,提供了对Tachyon进行存取数据操作的接口。
在TachyonBlockManager的init方法中,通过传入的blockManager和executorId,找到Tachyon的master地址和存放文件的根路径。并且通过TachyonFS.get(new TachyonURI(mas-ter),new TachyonConf())的方法调用,返回一个TachyonFs文件系统句柄,通过该文件系统句柄就可以方便地存取文件了。值得注意的是,在Tachyon 0.8.2中TachyonFs类已经过时,要得到Tachyon文件系统的引用,需要使用TachyonFileSystem.get方法。
下面分别查看TachyonBlockManager中的源代码,了解Spark中是如何使用Tachyon进行数据读写的。
1.TachyonBlockManager向Tachyon中存放数据
Spark中的数据要存入Tachyon,需要通过TachyonBlockManager提供的方法,这里以putBytes方法为例,讲解Spark中是如何使用Tachyon存放数据的。putBytes方法存放BlockId对应的字节数组中的数据,putBytes方法的源代码如下。
putBytes方法接受两个参数,分别是BlockId对象和ByteBuffer对象,分别代表RDD产生的blockId和RDD中的数据。通过getFile(blockId)方法,取出blockId哈希映射对应的TachyonFile,首先得到文件上的输出流,再将byteBuffer数据写入TachyonFile文件中。也许读者会感到奇怪,这里并没有对Tachyon的操作。在putBytes方法中没有看到任何Tachyon相关API的调用,答案在调用的getFile方法中。getFile的源代码如下。
可以看到getFile方法返回TachyonFile对象。代码中首先通过fileName经哈希函数求出fileName的非负哈希值,映射找出对应的目录及子目录。使用TachyonFs查看对应的目录及子目录是否存在,如果不存在子目录,则调用TachyonFs的mkdir方法创建子目录。得到文件的完整路径后,先通过client.exist方法判断filename对应的文件是否存在,如果不存在,则调用createFile方法创建文件,否则直接通过getFile方法返回该TachyonFile。从这个方法中,清楚地看到了在Spark中是如何使用Tachyon API来保存数据到分布式内存文件系统中的,相信这对于想单独使用Tachyon来作为共享内存系统的读者来说,有非常大的借鉴意义。
在使用Tachyon时,其实可以借鉴Spark中Tachyon的使用方式来构建自己的内存数据共享应用。介绍完Spark中数据保存Tachyon的方法后,接下来看一下Spark中是如何从Tachyon中取出数据的。(www.xing528.com)
2.TachyonBlockManager从Tachyon中取出数据
Spark中取出Tachyon中的数据也很简单,这里以getBytes为例,看一看在Spark中是如何从Tachyon中读取数据的。getBytes方法有一个参数,这个参数为RDD产生的BlockId对象,从BlockId对象中取出name属性,该属性在getFile方法中,经由哈希函数映射取出对应的TachyonFile文件。getBytes方法的代码如下。
getBytes方法接受一个参数BlockId,和putBytes一样,需要调用getFile(blockId)方法,返回blockId经哈希映射所对应的TachyonFile。首先判断getFile方法的返回值,如果该返回值为空或file.getLocationHosts的大小为0,直接返回None;否则得到该文件的输入流,并通过输入流读取数据到数组中,返回读取到的数据,最后关闭输出流。
Tachyon作为一个分布式内存文件系统,向外提供了方便的API,通过这些API可以很方便地整合Tachyon到其他系统中,有关Tachyon的更多信息可以到Tachyon(Alluxio)的官网查看,网址为:http://www.alluxio.org。6.6 小结
本章主要围绕Storage模块中的通信层和存储层进行讲解,在6.1节讲解了Storage的定义、原理及内部使用的设计模式;6.2节是本章最重要的两个小节,在6.2.1讲解了Storage模块的通信层,6.2.2讲解了Storage模块的存储层。
Spark预先定义了StorageLevel,在使用的时候可以直接通过StorageLevel选择数据存储的级别。当然不同的存储级别对性能及硬件设施都有影响,因此在6.3小节详细对比了StorageLevel的不同等级,在6.4小节论述了StorageLevel对性能的影响,在6.5小节简单介绍了目前Spark中存储级别为OFF_HEAP的唯一实现——Tachyon,简单介绍了其API的使用,并通过源代码介绍Tachyon在Spark中的使用。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。