向量化计算需要数据是列式存储格式的。gluten
作为spark
和本地向量化引擎的连接层,实现了列式存储数据的交互和转换。下面会先介绍spark
的列式存储原理,然后介绍了本地向量化层的列式存储,最后两者转换在spark sql
运行的原理。
Spark 列式存储
ColumnBatch
表示列式存储的数据,它的每列数据由ColumnVector
表示。定义如下:
|
|
ColumnVector
是抽象类,存储了具体的数据,它可以有多种实现方式,每种方式对应着不同的格式。
目前spark sql
内置了arrow
、orc
等格式,这些格式都是java
程序实现的。gluten
项目作为java
和native c++
的连接层,底层向量化引擎层也是使用了列式
它也定义了新的格式。
Gluten 列式存储
gluten
也实现了自己的Columnarbatch
,它只是作为中间层,连接底层的native c++
和上层的java
。它的实际数据都会存储在native
底层里。它的ColumnVector
组成如下:
gluten columnar batch
的第一个ColumnVector
必须是GlutenIndicatorVector
类型,里面包含了唯一标识符native handler id
,handler id
是读取native
数据的凭证。
后面的ColumnVector
都是GlutenPlaceholderVector
类型,它的内容都是空的。gluten
这样设计只是为了满足了ColumnBatch
的规范。
底层存储 NativeBatch
gluten columnar batch
的数据存储在底层,是由c++
编写的。它的存储方式分为两种,一种是基于velox
,一种是基于arrow
的。
基于velox
的,对应的实现类是VeloxColumnarBatch
,它使用velox RowVectorPtr
来存储数据。
基于arrow
的,对应的实现类有ArrowColumnarBatch
和ArrowCStructColumnarBatch
。ArrowColumnarBatch
使用了arrow RecordBatch
存储数据,ArrowCStructColumnarBatch
使用arrow ArrowSchema
和arrow ArrowArray
存储数据。
Gluten 列式数据读取
对于gluten native columnar batch
,java
程序不能直接读取,因为java
不清楚native columnar batch
的存储格式,而且native columnar batch
的存储格式也有多种。
java
程序想要读取,那么需要先将native columnar batch
转换为Spark InternalRow
格式。
java
在访问native
的数据,会经过一次内存复制和转换。为了减少java
和native
层的交互成本,native
层会将数据按照协商好的格式,保存到内存中。然后java
层直接根据内存地址,读取数据。协商的数据格式就是Spark UnsafeRow
。它的内存分布:
|
|
null bitset
必须按照8 bytes
对齐,每个bit
位对应该列值是否为null
。
values
存储着基础类型的字段值,每个值占用8 bytes
。
variable length portion
存储着变长类型的字段值。
NativeBatch 数据转换 UnsafeRow
native
层的数据存储如上所述,分为velox
和arrow
两种,对应了不同的转换方式。c++
设计了ColumnarToRowConverter
类,定义了转换接口。
|
|
对于velox
类型的数据,VeloxColumnarToRowConverter
负责生成UnsafaRow format memory
。velox
对这种场景有着优化,它会将底层存储RowVectorPtr
,直接转换。
对于其他类型的数据,都会将其先转换为arrow
格式,然后调用ArrowColumnarToRowConverter
负责转换。
Java 层调用转换接口
NativeColumnarToRowJniWrapper
接口由java
定义,负责将columnar
转换为row
,可以观察到,它采用jni c
实现
|
|
转换后的数据仍然存储在底层里,这些数据会按照Spark UnsafeRow
的格式,顺序存储。返回的结果由NativeColumnarToRowInfo
定义,包含了每条数据的内存地址和长度。
|
|
这样在读取数据时,调用UnsafeRow.pointTo
,即可直接读取内存,不需要c
和java
之间的数据转换。
InternalRow 转换 ColumnarBatch
目前gluten
只支持将InternalRow
转换为两种类型的ColumnarBatch
,分别对应不同的实现。
如果是Spark ColumnarBatch(ArrowWritableColumnVector[])
类型,那么直接使用java
程序实现转换。
如果是Spark ColumnarBatch(GlutenIndicatorVector)
类型,那么使用native c++
程序实现转换。
java 实现
java
会创建ArrowWritableColumnVector
列表,每个ArrowWritableColumnVector
代表着一列数据。ArrowWritableColumnVector
继承WritableColumnVector
接口,支持修改数据。
转换原理也很简单,InternalRow
表示一行数据,会将每列的值写入到对应的ArrowWritableColumnVector
。
native 实现
然后从InternalRow
的内存位置开始,将数据拷贝到缓存中,并且记录每行数据的长度。
然后交给c++
实现的RowToColumnarConverter
类处理转换。
转换后的ColumnBatch
为gluten
格式的。
RowToColumnarConverter
根据本地向量化引擎不同,转换后的Native ColumnarBatch
也不相同。这里以velox
引擎为例,转换实现逻辑在VeloxRowToColumnarConverter
,生成native columnar batch
的类型是VeloxColumnarBatch
。
VeloxRowToColumnarConverter
的实现,最后也是调用了velox UnsafeRowDeserializer
方法,返回的结果类型为VeloxColumnarBatch
。
SparkPlan
spark sql
都是生成SparkPlan Tree
,才能执行。对于Row
和ColumnarBatch
的相互转换,对应了GlutenColumnarToRowExecBase
和GlutenColumnarToRowExecBase
两种Spark Plan
。
gluten
对这些SparkPlan
的生成,封装了SparkPlanExecApi
接口。不同的本地向量化引擎,可以有着不同的实现。如下所示,以velox
为例
classDiagram
class SparkPlanExecApi {
<<interface>>
}
class GlutenSparkPlanExecApi {
<<abstract>>
}
class VeloxSparkPlanExecApi
SparkPlanExecApi <|.. GlutenSparkPlanExecApi
GlutenSparkPlanExecApi <|-- VeloxSparkPlanExecApi
GlutenSparkPlanExecApi
的两个方法,
|
|
VeloxSparkPlanExecApi
的两个方法实现,继承了GlutenSparkPlanExecApi
。
|
|