Gluten Columnar 数据格式

向量化计算需要数据是列式存储格式的。gluten作为spark和本地向量化引擎的连接层,实现了列式存储数据的交互和转换。下面会先介绍spark的列式存储原理,然后介绍了本地向量化层的列式存储,最后两者转换在spark sql运行的原理。

Spark 列式存储

ColumnBatch表示列式存储的数据,它的每列数据由ColumnVector表示。定义如下:

1
2
3
4
public final class ColumnarBatch implements AutoCloseable {  
  private int numRows;  
  private final ColumnVector[] columns;
}

ColumnVector是抽象类,存储了具体的数据,它可以有多种实现方式,每种方式对应着不同的格式。

目前spark sql内置了arroworc等格式,这些格式都是java程序实现的。gluten项目作为javanative c++的连接层,底层向量化引擎层也是使用了列式 它也定义了新的格式。

Gluten 列式存储

gluten也实现了自己的Columnarbatch,它只是作为中间层,连接底层的native c++和上层的java。它的实际数据都会存储在native底层里。它的ColumnVector组成如下:

gluten columnar batch的第一个ColumnVector必须是GlutenIndicatorVector类型,里面包含了唯一标识符native handler idhandler id是读取native数据的凭证。 后面的ColumnVector都是GlutenPlaceholderVector类型,它的内容都是空的。gluten这样设计只是为了满足了ColumnBatch的规范。

底层存储 NativeBatch

gluten columnar batch的数据存储在底层,是由c++编写的。它的存储方式分为两种,一种是基于velox,一种是基于arrow的。

基于velox的,对应的实现类是VeloxColumnarBatch,它使用velox RowVectorPtr来存储数据。

基于arrow的,对应的实现类有ArrowColumnarBatchArrowCStructColumnarBatchArrowColumnarBatch使用了arrow RecordBatch存储数据,ArrowCStructColumnarBatch使用arrow ArrowSchemaarrow ArrowArray存储数据。

Gluten 列式数据读取

对于gluten native columnar batchjava程序不能直接读取,因为java不清楚native columnar batch的存储格式,而且native columnar batch的存储格式也有多种。 java程序想要读取,那么需要先将native columnar batch转换为Spark InternalRow格式。 java在访问native的数据,会经过一次内存复制和转换。为了减少javanative层的交互成本,native层会将数据按照协商好的格式,保存到内存中。然后java 层直接根据内存地址,读取数据。协商的数据格式就是Spark UnsafeRow。它的内存分布:

1
2
3
---------------------------------------------------------------------------------------------------------
   null bitset   |   values    |  variable length portion  | number fields bits |   8 * number fields   |
---------------------------------------------------------------------------------------------------------

null bitset必须按照8 bytes对齐,每个bit位对应该列值是否为nullvalues存储着基础类型的字段值,每个值占用8 bytesvariable length portion存储着变长类型的字段值。

NativeBatch 数据转换 UnsafeRow

native层的数据存储如上所述,分为veloxarrow两种,对应了不同的转换方式。c++设计了ColumnarToRowConverter类,定义了转换接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class ColumnarToRowConverter {

  // 子类需要实现 int 和 write 接口
  virtual arrow::Status init();
  virtual arrow::Status write();
  uint8_t* getBufferAddress();

  // 返回结果的位置信息,内存地址、每条数据的offset和length
  std::vector<int32_t>& getOffsets();
  std::vector<int32_t, boost::alignment::aligned_allocator<int32_t, 32>>& getLengths();
}

对于velox 类型的数据,VeloxColumnarToRowConverter负责生成UnsafaRow format memoryvelox对这种场景有着优化,它会将底层存储RowVectorPtr,直接转换。 对于其他类型的数据,都会将其先转换为arrow格式,然后调用ArrowColumnarToRowConverter负责转换。

Java 层调用转换接口

NativeColumnarToRowJniWrapper接口由java定义,负责将columnar转换为row,可以观察到,它采用jni c 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class NativeColumnarToRowJniWrapper extends JniInitialized {  
  
  public NativeColumnarToRowJniWrapper() throws IOException {  
  }  
  
  public native NativeColumnarToRowInfo nativeConvertColumnarToRow(  
      long batchHandle, long allocatorId)  
      throws RuntimeException;  
  
  public native void nativeClose(long instanceID);  
  
}

转换后的数据仍然存储在底层里,这些数据会按照Spark UnsafeRow的格式,顺序存储。返回的结果由NativeColumnarToRowInfo定义,包含了每条数据的内存地址和长度。

1
2
3
4
5
6
public class NativeColumnarToRowInfo {  
  public long instanceID;  
  public int[] offsets;  
  public int[] lengths;  
  public long memoryAddress;
}

这样在读取数据时,调用UnsafeRow.pointTo,即可直接读取内存,不需要cjava之间的数据转换。

InternalRow 转换 ColumnarBatch

目前gluten只支持将InternalRow转换为两种类型的ColumnarBatch,分别对应不同的实现。

如果是Spark ColumnarBatch(ArrowWritableColumnVector[])类型,那么直接使用java程序实现转换。

如果是Spark ColumnarBatch(GlutenIndicatorVector)类型,那么使用native c++ 程序实现转换。

java 实现

java会创建ArrowWritableColumnVector列表,每个ArrowWritableColumnVector代表着一列数据。ArrowWritableColumnVector继承WritableColumnVector接口,支持修改数据。

转换原理也很简单,InternalRow表示一行数据,会将每列的值写入到对应的ArrowWritableColumnVector

native 实现

然后从InternalRow的内存位置开始,将数据拷贝到缓存中,并且记录每行数据的长度。 然后交给c++实现的RowToColumnarConverter类处理转换。 转换后的ColumnBatchgluten格式的。

RowToColumnarConverter根据本地向量化引擎不同,转换后的Native ColumnarBatch也不相同。这里以velox引擎为例,转换实现逻辑在VeloxRowToColumnarConverter,生成native columnar batch的类型是VeloxColumnarBatch

VeloxRowToColumnarConverter的实现,最后也是调用了velox UnsafeRowDeserializer方法,返回的结果类型为VeloxColumnarBatch

SparkPlan

spark sql都是生成SparkPlan Tree,才能执行。对于RowColumnarBatch的相互转换,对应了GlutenColumnarToRowExecBaseGlutenColumnarToRowExecBase两种Spark Plangluten对这些SparkPlan的生成,封装了SparkPlanExecApi接口。不同的本地向量化引擎,可以有着不同的实现。如下所示,以velox为例

classDiagram
class SparkPlanExecApi {
	<<interface>>
}
class GlutenSparkPlanExecApi {
	<<abstract>>
}
class VeloxSparkPlanExecApi
SparkPlanExecApi <|.. GlutenSparkPlanExecApi
GlutenSparkPlanExecApi <|-- VeloxSparkPlanExecApi

GlutenSparkPlanExecApi的两个方法,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  
abstract class GlutenSparkPlanExecApi extends SparkPlanExecApi {  
  
	override def genColumnarToRowExec(child: SparkPlan): GlutenColumnarToRowExecBase =  
		new VeloxColumnarToRowExec(child)  
	  
	
	override def genRowToColumnarExec(child: SparkPlan): GlutenRowToColumnarExec =  
		new GlutenRowToArrowColumnarExec(child)

}

VeloxSparkPlanExecApi的两个方法实现,继承了GlutenSparkPlanExecApi

1
2
3
4
5
6
7
8
abstract class GlutenSparkPlanExecApi extends SparkPlanExecApi {

  override def genColumnarToRowExec(child: SparkPlan): GlutenColumnarToRowExecBase =  
    new VeloxColumnarToRowExec(child)

  override def genRowToColumnarExec(child: SparkPlan): GlutenRowToColumnarExec =  
    new GlutenRowToArrowColumnarExec(child)
}
updatedupdated2023-07-122023-07-12