首页 理论教育 MLlib数据类型详解:本地向量、含标签点、矩阵和分布式矩阵

MLlib数据类型详解:本地向量、含标签点、矩阵和分布式矩阵

时间:2023-06-21 理论教育 版权反馈
【摘要】:MLlib提供的数据类型包括本地向量、含标签的点、本地矩阵和分布式矩阵。本节将会对以上的数据类型进行详细的介绍。本地向量和本地聚类是对外公开接口中的的简单数据模型。import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPoint//创建一个带着正标签和密集向量的点val pos=LabeledPoint//创建一个带着负标签和稀疏向量的点val neg=LabeledPoint实际运用中,稀疏数据是很常见的。import org.apache.spark.mllib.linalg.{Matrix,Matrices}//创建一个密集矩阵val dm:Matrix=Matrices.dense4.分布式矩阵一个分布式矩阵由long型行列索引数据和对应的double型值数据组成。

MLlib数据类型详解:本地向量、含标签点、矩阵和分布式矩阵

MLlib提供的数据类型包括本地向量(Local vector)、含标签的点(Labeled Point)、本地矩阵(Local matrix)和分布式矩阵(Distributed Matrix)。其中分布式矩阵包括面向行矩阵、行索引矩阵、三元组矩阵。本节将会对以上的数据类型进行详细的介绍。

MLlib支持存储在单个机器中的本地向量(Local Vectors)和本地矩阵(Local Matirces)以及由一个或多个RDD支撑实现的分布式矩阵。本地向量和本地聚类是对外公开接口中的的简单数据模型。底层线性代数操作通过Breeze和jablas来实现。在MLlib中,监督学习的一个训练实例称为“含有类标签的点”。

1.本地向量

一个本地向量有interger类型和0-based指数,而且这两个值都存储在一个机器中。MLlib支持两种类型的本地向量:密集和稀疏。密集的向量其输入值是由一系列数值型作为代表,而稀疏向量是指两个平行的数组:指数和值。举个例子,一个向量(1.0,0.0,3.0)能够表示为密集向量[1.0,0.0,3.0]或者稀疏向量(3,[0,2],[1.0,3.0]),其中3代表向量的大小。

本地向量的基类是Vector,而官方也提供了两个实现基类Vector的具体类DenseVector和SparseVector。建议通过Vectors中实现的工厂方法来创建本地向量。【例4-38】为本地向量的具体实现。

例4-38】创建本地向量示例。

import org.apache.spark.mllib.linalg.{Vector,Vectors}

//创建一个密集向量dv(1.0,0.0,3.0).

val dv:Vector=Vectors.dense(1.0,0.0,3.0)

val sv1:Vector=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))

val sv2:Vector=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))

注意:Scala语言默认引入的是scala.collection.immutable.Vector,为了使用MLlib的Vector,则必须先引入org.apache.spark.mllib.linalg.Vector。

2.标记点

一个标记点是一个本地向量,是密集或稀疏向量,与一个标签/响应有关。在MLlib中,标记点用于监督学习算法。由于使用双精度存储一个标签,所以可以在回归和分类中使用标记点。对于二元分类,一个标签应该是0(负数)或者是1(正数)。对于多元分类,标签的类索引应该从0开始。一个标记点的案例类LabeledPoint,如【例4-39】所示。

例4-39】创建标记点示例。

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

//创建一个带着正标签和密集向量的点

val pos=LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))

//创建一个带着负标签和稀疏向量的点

val neg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))

实际运用中,稀疏数据是很常见的。MLlib可以读取以LIBSVM格式存储的训练样例,LIBSVM格式是LIBSVM和LIBLINEAR的默认格式,这是一种文本格式,每行代表一个含类标签的稀疏特征向量。格式为:label index1:value1 index2:value2...

索引是从1开始并且递增。加载完成后,索引被转换为从0开始。

通过MLUtils.loadLibSVMFile读取训练实例并以LIBSVM格式存储,如【例4-40】所示。

例4-40】读取训练示例。

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.rdd.RDD

val examples:RDD[LabeledPoint]=MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt")

3.本地矩阵

一个本地矩阵由整型的行列索引数据和对应的double型值数据组成,存储在某一个机器中。MLlib支持密集矩阵,实体值以列优先的方式存储在一个double数组中,比如下面的矩阵:

其存储方式是一个一维数组[1.0,3.0,5.0,2.0,4.0,6.0]和矩阵大小(3,2)。

本地矩阵的基类是Matrix,官方也提供了一个具体的实现类提供了一个实现DenseMatrix。建议通过Matrices中实现的工厂模式方法来创建本地矩阵,如【例4-41】所示。

例4-41】创建本地矩阵示例。

import org.apache.spark.mllib.linalg.{Matrix,Matrices}

//创建一个密集矩阵((1.0,2.0),(3.0,4.0),(5.0,6.0))

val dm:Matrix=Matrices.dense(3,2,Array(1.0,3.0,5.0,2.0,4.0,6.0))

4.分布式矩阵

一个分布式矩阵由long型行列索引数据和对应的double型值数据组成。矩阵分布式存储在一个或多个RDD中。对于巨大的分布式的矩阵来说,选择正确的存储格式非常重要。将一个分布式矩阵转换为另一个不同格式需要全局Shuffle,所以代价很高。

目前已经实现了3类分布式矩阵存储格式,包含面向行的分布式矩阵(RowMatrix),行索引矩阵(IndexedRowMatrix)以及三元矩阵(CoordinateMatrix),之后将逐一介绍。最基本的类型是RowMatrix。一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个RDD来代表所有的行,每一行就是一个本地向量。对于RowMatrix,假定其列数量并不巨大,所以一个本地向量可以恰当地与驱动程序交换信息,并且能够在某一节点中存储和操作。IndexedRowMatrix与RowMatrix相似,但有行索引,可以用来识别行和进行join操作。而CoordinateMatrix是一个以元组列表格式(COO)存储的分布式矩阵,其实体集合是一个RDD。需要注意的是,因为需要缓存矩阵大小,所以分布式矩阵的底层RDD必须是确定的。通常来说,使用非确定RDD会导致错误

(1)面向行的分布式矩阵

一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的,比如一系列特征向量的一个集合。通过一个RDD代表所有的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,所以其列数就被整型数据大小所限制,其在实践中列数是一个很小的数值。

一个RowMatrix可从一个RDD[Vector]实例创建,如【例4-42】所示,然后就可以计算出其概要统计信息了。

例4-42】RowMatrix的创建方法示例。

import org.apache.spark.mllib.linalg.Vector

import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows:RDD[Vector]=...//一个本地向量的RDD

//从RDD[Vector]创建一个RowMatrix

val mat:RowMatrix=new RowMatrix(rows)

//得到mat的大小

val m=mat.numRows()

val n=mat.numCols()

(2)行索引矩阵

IndexedRowMatrix与RowMatrix相似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合(RDD of indexed rows)。每一行由long型索引和一个本地向量组成。

一个IndexedRowMatrix可从一个RDD[IndexedRow]实例创建,如【例4-43】所示,这里的IndexedRow是(Long,Vector)的封装类,剔除IndexedRowMatrix中的行索引信息就变成一个RowMatrix。

例4-43】IndexedRowMatrix创建方法示例。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}

val rows:RDD[IndexedRow]=...//一个具有行索引的RDD

//根据RDD[IndexedRow]创建一个IndexedRowMatrix

val mat:IndexedRowMatrix=new IndexedRowMatrix(rows)

//得到mat的大小

val m=mat.numRows()

val n=mat.numCols()

//删除其索引

val rowMat:RowMatrix=mat.toRowMatrix()

(3)三元矩阵

一个CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD。每一个实体是一个(i:Long,j:Long,value:Double)三元组,其中i代表行索引,j代表列索引,value代表实体的值。只有当矩阵的行和列数目都很巨大且矩阵很稀疏时,才使用CoordinateMatrix。

一个CoordinateMatrix可从一个RDD[MatrixEntry]实例创建,如【例4-44】所示,这里的MatrixEntry是(Long,Long,Double)的封装类。通过调用toIndexedRowMatrix可以将一个CoordinateMatrix转变为一个IndexedRowMatrix(其行是稀疏的)。目前暂不支持其他计算操作。

例4-44】CoordinateMatrix创建方法示例。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...//一个具有matrix entries的RDD

//根据RDD[MatrixEntry]创建一个CoordinateMatrix

val mat:CoordinateMatrix=new CoordinateMatrix(entries)

//得到mat大小

val m=mat.numRows()

val n=mat.numCols()

//将行为稀疏的CoordinateMatrix转换为IndexRowMatrix(www.xing528.com)

val indexedRowMatrix=mat.toIndexedRowMatrix()

(4)块矩阵(BlockMatrix)

一个块矩阵是一个由MatrixBlocks的RDD支持的分布式矩阵。其中块矩阵是一个如((int,int),Matrix)的元组,而且其中的Matrix的子矩阵指数的大小是rowsPerBlock×colsPerBlock。块矩阵间的运算支持的操作有加法和乘法。块矩阵也有一个函数validate,可以用来检查BlockMatrix是否设置正确。

一个BlockMatrix能够很方便地从IndexedRowMatrix或者CoordinateMatrix中调用toBlockMatrix方法实现。toBlockMatrix方法默认创建大小为1024×1024的块,用户也可以通过调用toBlockMatrix(rowsPerBlock,colsPerBlock)方法,改变其中的值来改变块的大小。

例4-45】创建块矩阵示例。

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...//一个具有(i,j,v)matrix entries的RDD

//根据RDD[MatrixEntry]创建一个CoordinateMatrix

val coordMat:CoordinateMatrix=new CoordinateMatrix(entries)

//将CoordinateMatrix转化为BlockMatrix

val matA:BlockMatrix=coordMat.toBlockMatrix().cache()

//验证BlockMatrix是否被创建,如果没有创建将会抛出一个异常

matA.validate()

//计算A^T A.

val ata=matA.transpose.multiply(matA)

其存储方式是一个一维数组[1.0,3.0,5.0,2.0,4.0,6.0]和矩阵大小(3,2)。

本地矩阵的基类是Matrix,官方也提供了一个具体的实现类提供了一个实现DenseMatrix。建议通过Matrices中实现的工厂模式方法来创建本地矩阵,如【例4-41】所示。

例4-41】创建本地矩阵示例。

import org.apache.spark.mllib.linalg.{Matrix,Matrices}

//创建一个密集矩阵((1.0,2.0),(3.0,4.0),(5.0,6.0))

val dm:Matrix=Matrices.dense(3,2,Array(1.0,3.0,5.0,2.0,4.0,6.0))

4.分布式矩阵

一个分布式矩阵由long型行列索引数据和对应的double型值数据组成。矩阵分布式存储在一个或多个RDD中。对于巨大的分布式的矩阵来说,选择正确的存储格式非常重要。将一个分布式矩阵转换为另一个不同格式需要全局Shuffle,所以代价很高。

目前已经实现了3类分布式矩阵存储格式,包含面向行的分布式矩阵(RowMatrix),行索引矩阵(IndexedRowMatrix)以及三元矩阵(CoordinateMatrix),之后将逐一介绍。最基本的类型是RowMatrix。一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的。比如一系列特征向量的一个集合。通过一个RDD来代表所有的行,每一行就是一个本地向量。对于RowMatrix,假定其列数量并不巨大,所以一个本地向量可以恰当地与驱动程序交换信息,并且能够在某一节点中存储和操作。IndexedRowMatrix与RowMatrix相似,但有行索引,可以用来识别行和进行join操作。而CoordinateMatrix是一个以三元组列表格式(COO)存储的分布式矩阵,其实体集合是一个RDD。需要注意的是,因为需要缓存矩阵大小,所以分布式矩阵的底层RDD必须是确定的。通常来说,使用非确定RDD会导致错误。

(1)面向行的分布式矩阵

一个RowMatrix是一个面向行的分布式矩阵,其行索引是没有具体含义的,比如一系列特征向量的一个集合。通过一个RDD代表所有的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,所以其列数就被整型数据大小所限制,其在实践中列数是一个很小的数值。

一个RowMatrix可从一个RDD[Vector]实例创建,如【例4-42】所示,然后就可以计算出其概要统计信息了。

例4-42】RowMatrix的创建方法示例。

import org.apache.spark.mllib.linalg.Vector

import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows:RDD[Vector]=...//一个本地向量的RDD

//从RDD[Vector]创建一个RowMatrix

val mat:RowMatrix=new RowMatrix(rows)

//得到mat的大小

val m=mat.numRows()

val n=mat.numCols()

(2)行索引矩阵

IndexedRowMatrix与RowMatrix相似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合(RDD of indexed rows)。每一行由long型索引和一个本地向量组成。

一个IndexedRowMatrix可从一个RDD[IndexedRow]实例创建,如【例4-43】所示,这里的IndexedRow是(Long,Vector)的封装类,剔除IndexedRowMatrix中的行索引信息就变成一个RowMatrix。

例4-43】IndexedRowMatrix创建方法示例。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}

val rows:RDD[IndexedRow]=...//一个具有行索引的RDD

//根据RDD[IndexedRow]创建一个IndexedRowMatrix

val mat:IndexedRowMatrix=new IndexedRowMatrix(rows)

//得到mat的大小

val m=mat.numRows()

val n=mat.numCols()

//删除其索引

val rowMat:RowMatrix=mat.toRowMatrix()

(3)三元矩阵

一个CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD。每一个实体是一个(i:Long,j:Long,value:Double)三元组,其中i代表行索引,j代表列索引,value代表实体的值。只有当矩阵的行和列数目都很巨大且矩阵很稀疏时,才使用CoordinateMatrix。

一个CoordinateMatrix可从一个RDD[MatrixEntry]实例创建,如【例4-44】所示,这里的MatrixEntry是(Long,Long,Double)的封装类。通过调用toIndexedRowMatrix可以将一个CoordinateMatrix转变为一个IndexedRowMatrix(其行是稀疏的)。目前暂不支持其他计算操作。

例4-44】CoordinateMatrix创建方法示例。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...//一个具有matrix entries的RDD

//根据RDD[MatrixEntry]创建一个CoordinateMatrix

val mat:CoordinateMatrix=new CoordinateMatrix(entries)

//得到mat大小

val m=mat.numRows()

val n=mat.numCols()

//将行为稀疏的CoordinateMatrix转换为IndexRowMatrix

val indexedRowMatrix=mat.toIndexedRowMatrix()

(4)块矩阵(BlockMatrix)

一个块矩阵是一个由MatrixBlocks的RDD支持的分布式矩阵。其中块矩阵是一个如((int,int),Matrix)的元组,而且其中的Matrix的子矩阵指数的大小是rowsPerBlock×colsPerBlock。块矩阵间的运算支持的操作有加法和乘法。块矩阵也有一个函数validate,可以用来检查BlockMatrix是否设置正确。

一个BlockMatrix能够很方便地从IndexedRowMatrix或者CoordinateMatrix中调用toBlockMatrix方法实现。toBlockMatrix方法默认创建大小为1024×1024的块,用户也可以通过调用toBlockMatrix(rowsPerBlock,colsPerBlock)方法,改变其中的值来改变块的大小。

例4-45】创建块矩阵示例。

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...//一个具有(i,j,v)matrix entries的RDD

//根据RDD[MatrixEntry]创建一个CoordinateMatrix

val coordMat:CoordinateMatrix=new CoordinateMatrix(entries)

//将CoordinateMatrix转化为BlockMatrix

val matA:BlockMatrix=coordMat.toBlockMatrix().cache()

//验证BlockMatrix是否被创建,如果没有创建将会抛出一个异常

matA.validate()

//计算A^T A.

val ata=matA.transpose.multiply(matA)

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈