MLlib提供的数据类型包括本地向量(Local vector)、含标签的点(Labeled Point)、本地矩阵(Local matrix)和分布式矩阵(Distributed Matrix)。其中分布式矩阵包括面向行矩阵、行索引矩阵、三元组矩阵。本节将会对以上的数据类型进行详细的介绍。
MLlib支持存储在单个机器中的本地向量(Local Vectors)和本地矩阵(Local Matirces)以及由一个或多个RDD支撑实现的分布式矩阵。本地向量和本地聚类是对外公开接口中的的简单数据模型。底层线性代数操作通过Breeze和jablas来实现。在MLlib中,监督学习的一个训练实例称为“含有类标签的点”。
1.本地向量
一个本地向量有interger类型和0-based指数,而且这两个值都存储在一个机器中。MLlib支持两种类型的本地向量:密集和稀疏。密集的向量其输入值是由一系列数值型作为代表,而稀疏向量是指两个平行的数组:指数和值。举个例子,一个向量(1.0,0.0,3.0)能够表示为密集向量[1.0,0.0,3.0]或者稀疏向量(3,[0,2],[1.0,3.0]),其中3代表向量的大小。
本地向量的基类是Vector,而官方也提供了两个实现基类Vector的具体类DenseVector和SparseVector。建议通过Vectors中实现的工厂方法来创建本地向量。【例4-38】为本地向量的具体实现。
【例4-38】创建本地向量示例。
import org.apache.spark.mllib.linalg.{Vector,Vectors}
//创建一个密集向量dv(1.0,0.0,3.0).
val dv:Vector=Vectors.dense(1.0,0.0,3.0)
val sv1:Vector=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))
val sv2:Vector=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))
注意:Scala语言默认引入的是scala.collection.immutable.Vector,为了使用MLlib的Vector,则必须先引入org.apache.spark.mllib.linalg.Vector。
2.标记点
一个标记点是一个本地向量,是密集或稀疏向量,与一个标签/响应有关。在MLlib中,标记点用于监督学习算法。由于使用双精度存储一个标签,所以可以在回归和分类中使用标记点。对于二元分类,一个标签应该是0(负数)或者是1(正数)。对于多元分类,标签的类索引应该从0开始。一个标记点的案例类LabeledPoint,如【例4-39】所示。
【例4-39】创建标记点示例。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
//创建一个带着正标签和密集向量的点
val pos=LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))
//创建一个带着负标签和稀疏向量的点
val neg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))
实际运用中,稀疏数据是很常见的。MLlib可以读取以LIBSVM格式存储的训练样例,LIBSVM格式是LIBSVM和LIBLINEAR的默认格式,这是一种文本格式,每行代表一个含类标签的稀疏特征向量。格式为:label index1:value1 index2:value2...
索引是从1开始并且递增。加载完成后,索引被转换为从0开始。
通过MLUtils.loadLibSVMFile读取训练实例并以LIBSVM格式存储,如【例4-40】所示。
【例4-40】读取训练示例。
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples:RDD[LabeledPoint]=MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt")
3.本地矩阵
一个本地矩阵由整型的行列索引数据和对应的double型值数据组成,存储在某一个机器中。MLlib支持密集矩阵,实体值以列优先的方式存储在一个double数组中,比如下面的矩阵:
其存储方式是一个一维数组[1.0,3.0,5.0,2.0,4.0,6.0]和矩阵大小(3,2)。
本地矩阵的基类是Matrix,官方也提供了一个具体的实现类提供了一个实现DenseMatrix。建议通过Matrices中实现的工厂模式方法来创建本地矩阵,如【例4-41】所示。
【例4-41】创建本地矩阵示例。
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
//创建一个密集矩阵((1.0,2.0),(3.0,4.0),(5.0,6.0))
val dm:Matrix=Matrices.dense(3,2,Array(1.0,3.0,5.0,2.0,4.0,6.0))
4.分布式矩阵
一个分布式矩阵由long型行列索引数据和对应的double型值数据组成。矩阵分布式存储在一个或多个RDD中。对于巨大的分布式的矩阵来说,选择正确的存储格式非常重要。将一个分布式矩阵转换为另一个不同格式需要全局Shuffle,所以代价很高。
目前已经实现了3类分布式矩阵存储格式,包含面向行的分布式矩阵(RowMatrix),行索引矩阵(IndexedRowMatrix)以及三元矩阵(CoordinateMatrix),之后将逐一介绍。最基本的类型是RowMatrix。一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个RDD来代表所有的行,每一行就是一个本地向量。对于RowMatrix,假定其列数量并不巨大,所以一个本地向量可以恰当地与驱动程序交换信息,并且能够在某一节点中存储和操作。IndexedRowMatrix与RowMatrix相似,但有行索引,可以用来识别行和进行join操作。而CoordinateMatrix是一个以三元组列表格式(COO)存储的分布式矩阵,其实体集合是一个RDD。需要注意的是,因为需要缓存矩阵大小,所以分布式矩阵的底层RDD必须是确定的。通常来说,使用非确定RDD会导致错误。
(1)面向行的分布式矩阵
一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的,比如一系列特征向量的一个集合。通过一个RDD代表所有的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,所以其列数就被整型数据大小所限制,其在实践中列数是一个很小的数值。
一个RowMatrix可从一个RDD[Vector]实例创建,如【例4-42】所示,然后就可以计算出其概要统计信息了。
【例4-42】RowMatrix的创建方法示例。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows:RDD[Vector]=...//一个本地向量的RDD
//从RDD[Vector]创建一个RowMatrix
val mat:RowMatrix=new RowMatrix(rows)
//得到mat的大小
val m=mat.numRows()
val n=mat.numCols()
(2)行索引矩阵
IndexedRowMatrix与RowMatrix相似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合(RDD of indexed rows)。每一行由long型索引和一个本地向量组成。
一个IndexedRowMatrix可从一个RDD[IndexedRow]实例创建,如【例4-43】所示,这里的IndexedRow是(Long,Vector)的封装类,剔除IndexedRowMatrix中的行索引信息就变成一个RowMatrix。
【例4-43】IndexedRowMatrix创建方法示例。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}
val rows:RDD[IndexedRow]=...//一个具有行索引的RDD
//根据RDD[IndexedRow]创建一个IndexedRowMatrix
val mat:IndexedRowMatrix=new IndexedRowMatrix(rows)
//得到mat的大小
val m=mat.numRows()
val n=mat.numCols()
//删除其索引
val rowMat:RowMatrix=mat.toRowMatrix()
(3)三元矩阵
一个CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD。每一个实体是一个(i:Long,j:Long,value:Double)三元组,其中i代表行索引,j代表列索引,value代表实体的值。只有当矩阵的行和列数目都很巨大且矩阵很稀疏时,才使用CoordinateMatrix。
一个CoordinateMatrix可从一个RDD[MatrixEntry]实例创建,如【例4-44】所示,这里的MatrixEntry是(Long,Long,Double)的封装类。通过调用toIndexedRowMatrix可以将一个CoordinateMatrix转变为一个IndexedRowMatrix(其行是稀疏的)。目前暂不支持其他计算操作。
【例4-44】CoordinateMatrix创建方法示例。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}
val entries:RDD[MatrixEntry]=...//一个具有matrix entries的RDD
//根据RDD[MatrixEntry]创建一个CoordinateMatrix
val mat:CoordinateMatrix=new CoordinateMatrix(entries)
//得到mat大小
val m=mat.numRows()
val n=mat.numCols()
//将行为稀疏的CoordinateMatrix转换为IndexRowMatrix(www.xing528.com)
val indexedRowMatrix=mat.toIndexedRowMatrix()
(4)块矩阵(BlockMatrix)
一个块矩阵是一个由MatrixBlocks的RDD支持的分布式矩阵。其中块矩阵是一个如((int,int),Matrix)的元组,而且其中的Matrix的子矩阵指数的大小是rowsPerBlock×colsPerBlock。块矩阵间的运算支持的操作有加法和乘法。块矩阵也有一个函数validate,可以用来检查BlockMatrix是否设置正确。
一个BlockMatrix能够很方便地从IndexedRowMatrix或者CoordinateMatrix中调用toBlockMatrix方法实现。toBlockMatrix方法默认创建大小为1024×1024的块,用户也可以通过调用toBlockMatrix(rowsPerBlock,colsPerBlock)方法,改变其中的值来改变块的大小。
【例4-45】创建块矩阵示例。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}
val entries:RDD[MatrixEntry]=...//一个具有(i,j,v)matrix entries的RDD
//根据RDD[MatrixEntry]创建一个CoordinateMatrix
val coordMat:CoordinateMatrix=new CoordinateMatrix(entries)
//将CoordinateMatrix转化为BlockMatrix
val matA:BlockMatrix=coordMat.toBlockMatrix().cache()
//验证BlockMatrix是否被创建,如果没有创建将会抛出一个异常
matA.validate()
//计算A^T A.
val ata=matA.transpose.multiply(matA)
其存储方式是一个一维数组[1.0,3.0,5.0,2.0,4.0,6.0]和矩阵大小(3,2)。
本地矩阵的基类是Matrix,官方也提供了一个具体的实现类提供了一个实现DenseMatrix。建议通过Matrices中实现的工厂模式方法来创建本地矩阵,如【例4-41】所示。
【例4-41】创建本地矩阵示例。
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
//创建一个密集矩阵((1.0,2.0),(3.0,4.0),(5.0,6.0))
val dm:Matrix=Matrices.dense(3,2,Array(1.0,3.0,5.0,2.0,4.0,6.0))
4.分布式矩阵
一个分布式矩阵由long型行列索引数据和对应的double型值数据组成。矩阵分布式存储在一个或多个RDD中。对于巨大的分布式的矩阵来说,选择正确的存储格式非常重要。将一个分布式矩阵转换为另一个不同格式需要全局Shuffle,所以代价很高。
目前已经实现了3类分布式矩阵存储格式,包含面向行的分布式矩阵(RowMatrix),行索引矩阵(IndexedRowMatrix)以及三元矩阵(CoordinateMatrix),之后将逐一介绍。最基本的类型是RowMatrix。一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个RDD来代表所有的行,每一行就是一个本地向量。对于RowMatrix,假定其列数量并不巨大,所以一个本地向量可以恰当地与驱动程序交换信息,并且能够在某一节点中存储和操作。IndexedRowMatrix与RowMatrix相似,但有行索引,可以用来识别行和进行join操作。而CoordinateMatrix是一个以三元组列表格式(COO)存储的分布式矩阵,其实体集合是一个RDD。需要注意的是,因为需要缓存矩阵大小,所以分布式矩阵的底层RDD必须是确定的。通常来说,使用非确定RDD会导致错误。
(1)面向行的分布式矩阵
一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的,比如一系列特征向量的一个集合。通过一个RDD代表所有的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,所以其列数就被整型数据大小所限制,其在实践中列数是一个很小的数值。
一个RowMatrix可从一个RDD[Vector]实例创建,如【例4-42】所示,然后就可以计算出其概要统计信息了。
【例4-42】RowMatrix的创建方法示例。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows:RDD[Vector]=...//一个本地向量的RDD
//从RDD[Vector]创建一个RowMatrix
val mat:RowMatrix=new RowMatrix(rows)
//得到mat的大小
val m=mat.numRows()
val n=mat.numCols()
(2)行索引矩阵
IndexedRowMatrix与RowMatrix相似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合(RDD of indexed rows)。每一行由long型索引和一个本地向量组成。
一个IndexedRowMatrix可从一个RDD[IndexedRow]实例创建,如【例4-43】所示,这里的IndexedRow是(Long,Vector)的封装类,剔除IndexedRowMatrix中的行索引信息就变成一个RowMatrix。
【例4-43】IndexedRowMatrix创建方法示例。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}
val rows:RDD[IndexedRow]=...//一个具有行索引的RDD
//根据RDD[IndexedRow]创建一个IndexedRowMatrix
val mat:IndexedRowMatrix=new IndexedRowMatrix(rows)
//得到mat的大小
val m=mat.numRows()
val n=mat.numCols()
//删除其索引
val rowMat:RowMatrix=mat.toRowMatrix()
(3)三元矩阵
一个CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD。每一个实体是一个(i:Long,j:Long,value:Double)三元组,其中i代表行索引,j代表列索引,value代表实体的值。只有当矩阵的行和列数目都很巨大且矩阵很稀疏时,才使用CoordinateMatrix。
一个CoordinateMatrix可从一个RDD[MatrixEntry]实例创建,如【例4-44】所示,这里的MatrixEntry是(Long,Long,Double)的封装类。通过调用toIndexedRowMatrix可以将一个CoordinateMatrix转变为一个IndexedRowMatrix(其行是稀疏的)。目前暂不支持其他计算操作。
【例4-44】CoordinateMatrix创建方法示例。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}
val entries:RDD[MatrixEntry]=...//一个具有matrix entries的RDD
//根据RDD[MatrixEntry]创建一个CoordinateMatrix
val mat:CoordinateMatrix=new CoordinateMatrix(entries)
//得到mat大小
val m=mat.numRows()
val n=mat.numCols()
//将行为稀疏的CoordinateMatrix转换为IndexRowMatrix
val indexedRowMatrix=mat.toIndexedRowMatrix()
(4)块矩阵(BlockMatrix)
一个块矩阵是一个由MatrixBlocks的RDD支持的分布式矩阵。其中块矩阵是一个如((int,int),Matrix)的元组,而且其中的Matrix的子矩阵指数的大小是rowsPerBlock×colsPerBlock。块矩阵间的运算支持的操作有加法和乘法。块矩阵也有一个函数validate,可以用来检查BlockMatrix是否设置正确。
一个BlockMatrix能够很方便地从IndexedRowMatrix或者CoordinateMatrix中调用toBlockMatrix方法实现。toBlockMatrix方法默认创建大小为1024×1024的块,用户也可以通过调用toBlockMatrix(rowsPerBlock,colsPerBlock)方法,改变其中的值来改变块的大小。
【例4-45】创建块矩阵示例。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}
val entries:RDD[MatrixEntry]=...//一个具有(i,j,v)matrix entries的RDD
//根据RDD[MatrixEntry]创建一个CoordinateMatrix
val coordMat:CoordinateMatrix=new CoordinateMatrix(entries)
//将CoordinateMatrix转化为BlockMatrix
val matA:BlockMatrix=coordMat.toBlockMatrix().cache()
//验证BlockMatrix是否被创建,如果没有创建将会抛出一个异常
matA.validate()
//计算A^T A.
val ata=matA.transpose.multiply(matA)
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。