Spark序列化与压缩原理

Spark的序列化与压缩

spark是分布式的计算框架,其中涉及到了 rpc 的通信和中间数据的缓存。spark为了高效率的通信和减少数据存储空间,会把数据先序列化,然后处理。

序列化种类

这篇文章讲的是spark 2.2,支持Java自带的序列化,还有KryoSerializer。KryoSerializer目前只能支持简单的数据类型,2.4对KryoSerializer的支持会更好。

SerializerManager提供了getSerializer接口, 会自动选择选用哪种序列化方式, 默认为Java自带的序列化。对于使用Kryo序列化的条件比较苛刻,需要数据类型为原始类型或其对应的数组,并且支持autoPick(对应的block不是StreamBlock类型)。

从getSerializer的源码可以看到,目前支持kryo序列化的类型,有字符串类型,基本数据类型和其对应的数组类型这几种。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
  // 如果允许自动挑选,并且这种类型的数据支持kryo
  if (autoPick && canUseKryo(ct)) {
    kryoSerializer
  } else {
    // 否则返回默认序列化
    defaultSerializer
  }
}

// 字符串类型
private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]

private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
  // 基本类型
  val primitiveClassTags = Set[ClassTag[_]](
      ClassTag.Boolean,
      ClassTag.Byte,
      ClassTag.Char,
      ClassTag.Double,
      ClassTag.Float,
      ClassTag.Int,
      ClassTag.Long,
      ClassTag.Null,
      ClassTag.Short
    )
    // 基本类型对应的数组
    val arrayClassTags = primitiveClassTags.map(_.wrap)
    // 合并类型
    primitiveClassTags ++ arrayClassTags
}

def canUseKryo(ct: ClassTag[_]): Boolean = {
  primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}

UML 类图

序列化的过程涉及到了多个类,这里使用了抽象工厂模式。SerializerInstance代表着抽象工厂,SerializationStream代表着序列化流,DeserializationStream代表着反序列化流。

Serializer也是SerializerInstance的工厂类,它通过newInstance实例化对应的SerializerInstance。

{% plantuml %}

@startuml spark-serializer

class Serializer {

​ newInstance( ) : SerializerInstance

}

class JavaSerializer

class KryoSerializer

class SerializerInstance {

​ serializeStream( ) : SerializationStream

​ deserializeStream( ) : DeserializationStream

}

class SerializationStream

class DeserializationStream

class JavaSerializerInstance

class JavaSerializationStream

class JavaDeserializationStream

class KryoSerializerInstance

class KryoSerializationStream

class KryoDeserializationStream

Serializer <|-- JavaSerializer

Serializer <|-- KryoSerializer

JavaSerializer --> JavaSerializerInstance

KryoSerializer --> KryoSerializerInstance

Serializer --> SerializerInstance

SerializerInstance --> SerializationStream

SerializerInstance --> DeserializationStream

SerializerInstance <|-- JavaSerializerInstance

SerializationStream <|-- JavaSerializationStream

DeserializationStream <|-- JavaDeserializationStream

JavaSerializerInstance --> JavaSerializationStream

JavaSerializerInstance --> JavaDeserializationStream

SerializerInstance <|-- KryoSerializerInstance

SerializationStream <|-- KryoSerializationStream

DeserializationStream <|-- KryoDeserializationStream

KryoSerializerInstance --> KryoSerializationStream

KryoSerializerInstance --> KryoDeserializationStream

@enduml

{% endplantuml %}

序列化相关的缓存流

spark序列化数据,提供了写入到缓存中和输出流。这里涉及到了下面几个类

ByteArrayOutputStream 将数据存储在字节数组里,并且可以自动扩充数组大小。

ByteBufferOutputStream 继承ByteArrayOutputStream, 提供了将数据转换为ByteBuffer的功能。

BufferedOutputStream 提供了缓存的作用,数据首先写入到BufferedOutputStream的缓存里,如果缓存满了,才会写入被装饰的底层流。

ChunkedByteBufferOutputStream 提供了多个小块ByteBuffer的数组,数据都会存在在各个ByteBuffer里面。当数据写满后,会新建ByteBuffer添加到数组里。

ChunkedByteBuffer 包含了ByteBuffer的数组, 提供了只读的功能。可以从ChunkedByteBufferOutputStream生成出来。

ChunkedByteBufferInputStream包装了ChunkedByteBuffer, 提供流式读取的接口。

Java序列化

JavaSerializerInstance负责java序列化的实现。它的serialize方法实现了序列化一个对象,它的serializeStream方法提供了序列化的流。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private[spark] class JavaSerializerInstance(
    counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
  extends SerializerInstance {
 
  // 它调用了serializeStream生成流,然后写入数据。
  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    // 生成bytebuffer输出流
    val bos = new ByteBufferOutputStream()
    // 装饰序列化流
    val out = serializeStream(bos)
    // 写入数据
    out.writeObject(t)
    out.close()
    // 返回bytebuffer
    bos.toByteBuffer
  }
  
  // 返回JavaSerializationStream流
  override def serializeStream(s: OutputStream): SerializationStream = {
    new JavaSerializationStream(s, counterReset, extraDebugInfo)
  }
}

JavaSerializationStream作为流的装饰器,提供了序列化的功能。其实这里仅仅对ObjectOutputStream的封装,ObjectOutputStream是属于java库的类,通过它可以将数据序列化。不过ObjectOutputStream有个缺陷,当序列化的数据连续是同一个类型,ObjectOutputStream为了优化序列化的空间效率,会在内存中保存这些数据,这个有可能会导致内存溢出。所以JavaSerializationStream这里设置了定期每写入一定数目的数据,就会调用reset,避免这个问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private[spark] class JavaSerializationStream(
    out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
  extends SerializationStream {
      
  private val objOut = new ObjectOutputStream(out)
  // 自从上一次reset后,写入的数量
  private var counter = 0

  /**
   * Calling reset to avoid memory leak:
   * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
   * But only call it every 100th time to avoid bloated serialization streams (when
   * the stream 'resets' object class descriptions have to be re-written)
   */
  def writeObject[T: ClassTag](t: T): SerializationStream = {
    try {
      objOut.writeObject(t)
    } catch {
      case e: NotSerializableException if extraDebugInfo =>
        throw SerializationDebugger.improveException(t, e)
    }
    counter += 1
    // 每写入counterReset条数据,则调用reset
    if (counterReset > 0 && counter >= counterReset) {
      objOut.reset()
      counter = 0
    }
    this
  }

  def flush() { objOut.flush() }
  def close() { objOut.close() }
}

deserialize 方法提供了反序列化,反序列化涉及到了类的动态加载,这里可以指定ClassLoader。它生成JavaDeserializationStream,通过它解析数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private[spark] class JavaSerializerInstance(
    counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
  extends SerializerInstance {
  
  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    val bis = new ByteBufferInputStream(bytes)
    val in = deserializeStream(bis)
    in.readObject()
  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
    // 生成bytebuffer的输入流
    val bis = new ByteBufferInputStream(bytes)
   // 装饰反序列化流
    val in = deserializeStream(bis, loader)
    in.readObject()
  }

  override def deserializeStream(s: InputStream): DeserializationStream = {
    new JavaDeserializationStream(s, defaultClassLoader)
  }

  def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
    new JavaDeserializationStream(s, loader)
  }
  

JavaDeserializationStream的原理,它使用了ObjectInputStream类。ObjectInputStream类是属于java库的,它提供了反序列化的功能。这里实现了resolveClass方法,提供了指定ClassLoader来加载类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
  extends DeserializationStream {
  
  private val objIn = new ObjectInputStream(in) {
    override def resolveClass(desc: ObjectStreamClass): Class[_] =
      try { 
        // scalastyle:off classforname
        Class.forName(desc.getName, false, loader)
        // scalastyle:on classforname
      } catch {
        case e: ClassNotFoundException =>
          JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)
      }
  }

  def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
  def close() { objIn.close() }
}

Kryo 序列化

kryo的初始化,在KryoSerializer类的newKryo方法,Kryo预先注册需要序列化的类。

newKryoOutput方法会实例化KryoOutput, 作为数据存储的缓冲区。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class KryoSerializer(conf: SparkConf)
  extends org.apache.spark.serializer.Serializer {

  // 是否需要注册类,才能序列化对应的实例
  private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)

  def newKryo(): Kryo = {
    // 这里通过EmptyScalaKryoInstantiator工厂,实例化Kryo
    val instantiator = new EmptyScalaKryoInstantiator
    val kryo = instantiator.newKryo()
    kryo.setRegistrationRequired(registrationRequired)
    // 如果没有ClassLoader,则使用当前线程的ClassLoader
    val oldClassLoader = Thread.currentThread.getContextClassLoader
    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)

    // 向kryo注册类信息
    .........
    kryo.setClassLoader(classLoader)
    kryo
  }
      
  // 实例化KryoOutput, 使用默认的配置
  def newKryoOutput(): KryoOutput =
    if (useUnsafe) {
      new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize))
    } else {
      new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
    }
  }

}

类的serialize方法实现了序列化一个对象,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
  extends SerializerInstance {
  // 缓存的Kryo
  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
  
  // 调用KryoSerializer的newKryoOutput,实例化Kryo的缓存区
  private lazy val output = ks.newKryoOutput()

  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    // 清除数据
    output.clear()
    // 如果已经有Kryo的实例,则直接返回。否则需要创建Kryo
    val kryo = borrowKryo()
    try {
      // 序列化数据
      kryo.writeClassAndObject(output, t)
    } catch {
      case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
        throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
          "increase spark.kryoserializer.buffer.max value.", e)
    } finally {
      releaseKryo(kryo)
    }
    // 返回序列化后的数据,注意output.toBytes会返回新的数组
    ByteBuffer.wrap(output.toBytes)
  }
      
      
  private lazy val output = ks.newKryoOutput()
      
}

serializeStream方法返回KryoSerializationStream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class KryoSerializationStream(
    serInstance: KryoSerializerInstance,
    outStream: OutputStream,
    useUnsafe: Boolean) extends SerializationStream {

  // 将outStream 装饰成 KryoOutput输出
  private[this] var output: KryoOutput =
    if (useUnsafe) new KryoUnsafeOutput(outStream) else new KryoOutput(outStream)

  private[this] var kryo: Kryo = serInstance.borrowKryo()

  override def writeObject[T: ClassTag](t: T): SerializationStream = {
    // 调用kryo的方法,序列化数据,写入outStream
    kryo.writeClassAndObject(output, t)
    this
  }
}

SerializerManager接口

SerializerManager统一了Java序列化和Kryo序列化的接口。我们只需要调用SerializerManager的方法,就可很方便的序列化数据。

序列化数据到内存

dataSerialize方法提供了将数据序列化,然后存到内存中。这里使用了ChunkedByteBufferOutputStream存储序列化的数据。使用了BufferedOutputStream在外层提供了缓存功能。在BufferedOutputStream之外,添加了压缩和序列化的功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** Serializes into a chunked byte buffer. */
def dataSerialize[T: ClassTag](
    blockId: BlockId,
    values: Iterator[T]): ChunkedByteBuffer = {
  // 调用dataSerializeWithExplicitClassTag方法序列化
  dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
}

/** Serializes into a chunked byte buffer. */
def dataSerializeWithExplicitClassTag(
    blockId: BlockId,
    values: Iterator[_],
    classTag: ClassTag[_]): ChunkedByteBuffer = {
  // 实例化ChunkedByteBufferOutputStream,保存序列化数据
  val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
  // 为bbos装饰为缓冲输出流
  val byteStream = new BufferedOutputStream(bbos)
  // StreamBlock类型的数据,不支持autoPick
  val autoPick = !blockId.isInstanceOf[StreamBlockId]
  // 获取序列化器
  val ser = getSerializer(classTag, autoPick).newInstance()
  // 调用wrapForCompression, 添加压缩流装饰器。
  // 然后添加序列化流装饰器
  ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
  // 返回序列化数据ChunkedByteBuffer
  bbos.toChunkedByteBuffer
}

序列化数据到流

序列化数据写入流,dataSerializeStream方法实现了装饰底层的流,并且将数据写入流中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def dataSerializeStream[T: ClassTag](
    blockId: BlockId,
    outputStream: OutputStream,
    values: Iterator[T]): Unit = {
  // 实例化缓冲输出流
  val byteStream = new BufferedOutputStream(outputStream)
  // StreamBlock类型的数据,不支持autoPick
  val autoPick = !blockId.isInstanceOf[StreamBlockId]
  // 获取序列化器
  val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
  // 调用wrapForCompression, 添加压缩流装饰器。
  // 然后添加序列化流装饰器
  ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

数据压缩

当指定了spark.io.compression.codec配置的值后,spark会选择对应的压缩方式。

目前压缩方式支持三种方式, lz4, lzf, snappy。

压缩支持输入流和输出流,由CompressionCodec接口表示。

1
2
3
4
5
6
trait CompressionCodec {

  def compressedOutputStream(s: OutputStream): OutputStream

  def compressedInputStream(s: InputStream): InputStream
}

每种压缩方式都会实现这个接口。以lz4为例,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
    // 返回LZ4BlockOutputStream包装的输出流
    new LZ4BlockOutputStream(s, blockSize)
  }

  // 返回LZ4BlockInputStream包装的输入流
  override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
}

当序列化数据的时候,会根据存储Block的类型,判断是否需要压缩

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private[spark] class SerializerManager(
 
  // Broadcast类型的数据是否压缩
  private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  // Shuffle类型的数据是否压缩
  private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  // RDD类型的数据是否压缩 
  private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  // 存储在磁盘的shuffle类型的数据,是否压缩
  private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

  // 根据数据的类型,判断是否配置中允许压缩
  private def shouldCompress(blockId: BlockId): Boolean = {
    blockId match {
      case _: ShuffleBlockId => compressShuffle
      case _: BroadcastBlockId => compressBroadcast
      case _: RDDBlockId => compressRdds
      case _: TempLocalBlockId => compressShuffleSpill
      case _: TempShuffleBlockId => compressShuffle
      case _ => false
    }
  }
}
updatedupdated2023-07-022023-07-02