向量化计算需要数据是列式存储格式的。gluten作为spark和本地向量化引擎的连接层,实现了列式存储数据的交互和转换。下面会先介绍spark的列式存储原理,然后介绍了本地向量化层的列式存储,最后两者转换在spark sql运行的原理。
Spark 列式存储
ColumnBatch表示列式存储的数据,它的每列数据由ColumnVector表示。定义如下:
|
|
ColumnVector是抽象类,存储了具体的数据,它可以有多种实现方式,每种方式对应着不同的格式。
目前spark sql内置了arrow、orc等格式,这些格式都是java程序实现的。gluten项目作为java和native c++的连接层,底层向量化引擎层也是使用了列式
它也定义了新的格式。
Gluten 列式存储
gluten也实现了自己的Columnarbatch,它只是作为中间层,连接底层的native c++和上层的java。它的实际数据都会存储在native底层里。它的ColumnVector组成如下:
gluten columnar batch的第一个ColumnVector必须是GlutenIndicatorVector类型,里面包含了唯一标识符native handler id,handler id是读取native数据的凭证。
后面的ColumnVector都是GlutenPlaceholderVector类型,它的内容都是空的。gluten这样设计只是为了满足了ColumnBatch的规范。
底层存储 NativeBatch
gluten columnar batch的数据存储在底层,是由c++编写的。它的存储方式分为两种,一种是基于velox,一种是基于arrow的。
基于velox的,对应的实现类是VeloxColumnarBatch,它使用velox RowVectorPtr来存储数据。
基于arrow的,对应的实现类有ArrowColumnarBatch和ArrowCStructColumnarBatch。ArrowColumnarBatch使用了arrow RecordBatch存储数据,ArrowCStructColumnarBatch使用arrow ArrowSchema和arrow ArrowArray存储数据。
Gluten 列式数据读取
对于gluten native columnar batch,java程序不能直接读取,因为java不清楚native columnar batch的存储格式,而且native columnar batch的存储格式也有多种。
java程序想要读取,那么需要先将native columnar batch转换为Spark InternalRow格式。
java在访问native的数据,会经过一次内存复制和转换。为了减少java和native层的交互成本,native层会将数据按照协商好的格式,保存到内存中。然后java 层直接根据内存地址,读取数据。协商的数据格式就是Spark UnsafeRow。它的内存分布:
|
|
null bitset必须按照8 bytes对齐,每个bit位对应该列值是否为null。
values存储着基础类型的字段值,每个值占用8 bytes。
variable length portion存储着变长类型的字段值。
NativeBatch 数据转换 UnsafeRow
native层的数据存储如上所述,分为velox和arrow两种,对应了不同的转换方式。c++设计了ColumnarToRowConverter类,定义了转换接口。
|
|
对于velox 类型的数据,VeloxColumnarToRowConverter负责生成UnsafaRow format memory。velox对这种场景有着优化,它会将底层存储RowVectorPtr,直接转换。
对于其他类型的数据,都会将其先转换为arrow格式,然后调用ArrowColumnarToRowConverter负责转换。
Java 层调用转换接口
NativeColumnarToRowJniWrapper接口由java定义,负责将columnar转换为row,可以观察到,它采用jni c 实现
|
|
转换后的数据仍然存储在底层里,这些数据会按照Spark UnsafeRow的格式,顺序存储。返回的结果由NativeColumnarToRowInfo定义,包含了每条数据的内存地址和长度。
|
|
这样在读取数据时,调用UnsafeRow.pointTo,即可直接读取内存,不需要c和java之间的数据转换。
InternalRow 转换 ColumnarBatch
目前gluten只支持将InternalRow转换为两种类型的ColumnarBatch,分别对应不同的实现。
如果是Spark ColumnarBatch(ArrowWritableColumnVector[])类型,那么直接使用java程序实现转换。
如果是Spark ColumnarBatch(GlutenIndicatorVector)类型,那么使用native c++ 程序实现转换。
java 实现
java会创建ArrowWritableColumnVector列表,每个ArrowWritableColumnVector代表着一列数据。ArrowWritableColumnVector继承WritableColumnVector接口,支持修改数据。
转换原理也很简单,InternalRow表示一行数据,会将每列的值写入到对应的ArrowWritableColumnVector。
native 实现
然后从InternalRow的内存位置开始,将数据拷贝到缓存中,并且记录每行数据的长度。
然后交给c++实现的RowToColumnarConverter类处理转换。
转换后的ColumnBatch为gluten格式的。
RowToColumnarConverter根据本地向量化引擎不同,转换后的Native ColumnarBatch也不相同。这里以velox引擎为例,转换实现逻辑在VeloxRowToColumnarConverter,生成native columnar batch的类型是VeloxColumnarBatch。
VeloxRowToColumnarConverter的实现,最后也是调用了velox UnsafeRowDeserializer方法,返回的结果类型为VeloxColumnarBatch。
SparkPlan
spark sql都是生成SparkPlan Tree,才能执行。对于Row和ColumnarBatch的相互转换,对应了GlutenColumnarToRowExecBase和GlutenColumnarToRowExecBase两种Spark Plan。
gluten对这些SparkPlan的生成,封装了SparkPlanExecApi接口。不同的本地向量化引擎,可以有着不同的实现。如下所示,以velox为例
classDiagram
class SparkPlanExecApi {
<<interface>>
}
class GlutenSparkPlanExecApi {
<<abstract>>
}
class VeloxSparkPlanExecApi
SparkPlanExecApi <|.. GlutenSparkPlanExecApi
GlutenSparkPlanExecApi <|-- VeloxSparkPlanExecApi
GlutenSparkPlanExecApi的两个方法,
|
|
VeloxSparkPlanExecApi的两个方法实现,继承了GlutenSparkPlanExecApi。
|
|