Spark Streaming WAL 原理

Spark Streaming WAL 原理

WAL表示预写日志,经常在数据库中会使用到,在宕机后也能根据WAL恢复数据。Spark Streaming为了提高服务的容错性,也引入了WAL。它会将WAL存到可靠的文件系统 hdfs 里。Spark Streaming

运行在 executor 节点的 receiver, 从数据源读取数据后,如果配置了wal选项,会将数据写入WAL。这样当 executor 节点挂了之后,还能从WAL中恢复数据。

运行在 driver 节点的ReceivedBlockTracker,负责管理 block的元数据。当处理添加block,分配block和删除block请求的时候,会将此次事件信息写入WAL。

WAL Writer 种类

WriteAheadLog是WAL处理的抽象类,由如下方法,提供了读取,写入,删除WAL。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public abstract class WriteAheadLog {
  
  // 将数据写入到WAL文件,参数record为即将保存的数据,参数time表示数据的结束时间
  // 返回WriteAheadLogRecordHandle对象,包含了存储的信息
  public abstract WriteAheadLogRecordHandle write(ByteBuffer record, long time);
  
  // 根据write方法返回的信息,来读取对应的数据
  public abstract ByteBuffer read(WriteAheadLogRecordHandle handle);

  // 读取所有还未过期的数据
  public abstract Iterator<ByteBuffer> readAll();

  // 清除过期的WAL文件,参数threshTime表示截止时间
  public abstract void clean(long threshTime, boolean waitForCompletion);
}

WriteAheadLog有两个子类,对应不同的存储原理。

子类FileBasedWriteAheadLog,实现了以文件的方式存储数据。

子类BatchedWriteAheadLog,基于FileBasedWriteAheadLog之上,实现了批次的存储。

FileBasedWriteAheadLog 原理

executor 节点上的WAL采用了FileBasedWriteAheadLog管理。如果要支持 wal,必须指定 checkpoint 的目录。

它的WAL目录格式如下:

1
2
3
4
5
6
7
8
checkpointDir/
├── receivedData
│   ├── streamId0
|   |   |── log-starttime0-endtime0
|   |   |── log-starttime1-endtime1
|   |   |── log-starttime1-endtime1
│   ├── streamId1
│   └── streamId2

wal的根目录是 checkpoint 目录下的 receivedData 目录。

每个 receiver 都有独立的目录,目录名为它的 id 号。

在每个独立的目录下,还会按照时间范围存储WAL文件,文件名中包含了起始时间和结束时间。

WAL创建

WAL的创建由write方法负责。再介绍write方法之前,需要先介绍FileBasedWriteAheadLogWriter类。它负责wal的写入,会将数据写入到 可靠的文件系统 hdfs 里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FileBasedWriteAheadLogWriter(path: String, hadoopConf: Configuration)
  extends Closeable {
  // 根据配置,返回hdfs的客户端或者本地文件的客户端,并且创建文件
  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

  // 获取当前文件的偏移量
  private var nextOffset = stream.getPos()
  private var closed = false

  /** Write the bytebuffer to the log file */
  def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
    data.rewind() // 准备读
    val lengthToWrite = data.remaining()
    // 记录本次数据的位置信息,WAL文件路径,起始位置,数据长度
    val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
    // 写入数据长度
    stream.writeInt(lengthToWrite)
    // 将bytebuffer的数据写入到outputstream
    Utils.writeByteBuffer(data, stream: OutputStream)
    // 刷新缓存
    flush()
    // 更新当前文件的偏移量
    nextOffset = stream.getPos()
    // 返回此次数据的信息
    segment
  }

  private def flush() {
    stream.hflush()
    // Useful for local file system where hflush/sync does not work (HADOOP-7844)
    stream.getWrappedStream.flush()
  }
}

FileBasedWriteAheadLogWriter的write方法返回FileBasedWriteAheadLogSegment结果,表示了此次WAL数据的位置信息。后面的WAL读取,会通过它来找到位置。

接下来看看FileBasedWriteAheadLog的write方法。它会根据时间范围,存储到不同的文件中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private[streaming] class FileBasedWriteAheadLog {
  
  // 记录当前WAL文件的路径
  private var currentLogPath: Option[String] = None
  // 记录当前WAL writer
  private var currentLogWriter: FileBasedWriteAheadLogWriter = null
  // 当前WAL文件的开始时间
  private var currentLogWriterStartTime: Long = -1L
  // 当前WAL文件的结束时间
  private var currentLogWriterStopTime: Long = -1L
  // 记录完成的WAL文件信息
  private val pastLogs = new ArrayBuffer[LogInfo]
  
  def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
    var fileSegment: FileBasedWriteAheadLogSegment = null
    var failures = 0
    var lastException: Exception = null
    var succeeded = false
    // 尝试最多maxFailures次数
    while (!succeeded && failures < maxFailures) {
      try {
        // 首先调用getLogWriter获取writer,
        // 然后通过writer写入数据
        fileSegment = getLogWriter(time).write(byteBuffer)
        if (closeFileAfterWrite) {
          resetWriter()
        }
        succeeded = true
      } catch {
        case ex: Exception =>
          lastException = ex
          logWarning("Failed to write to write ahead log")
          resetWriter()
          failures += 1
      }
    }
    if (fileSegment == null) {
      logError(s"Failed to write to write ahead log after $failures failures")
      throw lastException
    }
    fileSegment
  }
  
  private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
    // 每个WAL文件都有对应的时间范围,如果超过了,则需要创建新的WAL文件
    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
      // 关闭当前writer
      resetWriter()
      // 添加当前WAL的信息到currentLogPath列表,
      // 相关信息包括起始时间,结束时间,文件路径
      currentLogPath.foreach {
        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
      }
      // 更新currentLogWriterStartTime为新的WAL文件的开始时间
      currentLogWriterStartTime = currentTime
      // 更新currentLogWriterStopTime为新的WAL文件的结束时间
      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
      // 生成新的WAL文件的路径
      val newLogPath = new Path(logDirectory,
        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
      // 更新currentLogPath为新的WAL文件的路径
      currentLogPath = Some(newLogPath.toString)
      // 更新currentLogWriter为新的writer
      currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
    }
    currentLogWriter
  }  

WAL 读取

WAL的读取由read方法负责,它只是通过FileBasedWriteAheadLogRandomReader来读取。FileBasedWriteAheadLogRandomReader支持seek操作,所以它支持单次数据的读取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class FileBasedWriteAheadLog {  
  // 参数segment是 write方法返回的FileBasedWriteAheadLogSegment
  // 包含了数据的位置信息
  def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
    val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment]
    var reader: FileBasedWriteAheadLogRandomReader = null
    var byteBuffer: ByteBuffer = null
    try {
      // 实例化FileBasedWriteAheadLogRandomReader,读取数据
      reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
      byteBuffer = reader.read(fileSegment)
    } finally {
      reader.close()
    }
    byteBuffer
  }
}

class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
  extends Closeable {
  // 获取hdfs的读客户端
  private val instream = HdfsUtils.getInputStream(path, conf)
  private var closed = (instream == null) // the file may be deleted as we're opening the stream

  def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
    // 获取该数据所在文件中的起始位置
    // 调用seek移动文件的读取位置
    instream.seek(segment.offset)
    // 读取数据的长度
    val nextLength = instream.readInt()
    // 实例化Byte数组
    val buffer = new Array[Byte](nextLength)
    // 读取数据到数组
    instream.readFully(buffer)
    // 返回ByteBuffer
    ByteBuffer.wrap(buffer)
  }
}

WAL 删除

spark streaming处理的是流数据,它不可能会将所有的数据都保存下来。所以对于处理过的数据,它会定期删除掉。FileBasedWriteAheadLog同样提供了clean接口,来处理过期的数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class FileBasedWriteAheadLog {
  // WAL文件信息列表
  private val pastLogs = new ArrayBuffer[LogInfo]
  
  def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
    // 找到结束时间小于threshTime的WAL文件
    val oldLogFiles = synchronized {
      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
      pastLogs --= expiredLogs
      expiredLogs
    }
  
    def deleteFile(walInfo: LogInfo): Unit = {
      try {
        // 获取WAL文件的路径
        val path = new Path(walInfo.path)
        // 获取FileSystem
        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
        // 删除WAL文件
        fs.delete(path, true)
      } catch {
        case ex: Exception =>
          logWarning(s"Error clearing write ahead log file $walInfo", ex)
      }
    }
    
    // 遍历需要删除的WAL文件列表,调用deleteFile方法删除
    oldLogFiles.foreach { logInfo =>
      if (!executionContext.isShutdown) {
        try {
          // 使用线程池删除文件
          val f = Future { deleteFile(logInfo) }(executionContext)
          if (waitForCompletion) {
            import scala.concurrent.duration._
            // scalastyle:off awaitready
            Await.ready(f, 1 second)
            // scalastyle:on awaitready
          }
        } catch {
          case e: RejectedExecutionException =>
            logWarning("Execution context shutdown before deleting old WriteAheadLogs. " +
              "This would not affect recovery correctness.", e)
        }
      }
    }
  }
}

BatchedWriteAheadLog

BatchedWriteAheadLog只运行在driver端,还需要spark配置中指定spark.streaming.driver.writeAheadLog.allowBatching选项为true。它会将数据线缓存起来,然后一次取多条数据,封装成一个批次存储到文件中。这样提高了系统的吞吐量。

BatchedWriteAheadLog首先将每次请求写入的数据,先缓存到一个队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: SparkConf) {
  // WAL数据队列
  private val walWriteQueue = new LinkedBlockingQueue[Record]()
  
  override def write(byteBuffer: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
    val promise = Promise[WriteAheadLogRecordHandle]()
    val putSuccessfully = synchronized {
      if (active) {
        // 实例化Record,并且添加到walWriteQueue队列里
        walWriteQueue.offer(Record(byteBuffer, time, promise))
        true
      } else {
        false
      }
    }
    ......
  }    
}

BatchedWriteAheadLog还有一个后台线程,会一直从队列中获取数据,然后封装成batch的格式,通过FileBasedWriteAheadLog写入WAL。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: SparkConf) {
  
  // WAL数据队列
  private val walWriteQueue = new LinkedBlockingQueue[Record]()
  // 数据缓存,保存即将保存的batch数据
  private val buffer = new ArrayBuffer[Record]()
  // batch write 线程
  private val batchedWriterThread = startBatchedWriterThread()
    
  private def startBatchedWriterThread(): Thread = {
    // 循环的调用flushRecords方法
    val thread = new Thread(new Runnable {
      override def run(): Unit = {
        while (active) {
          flushRecords()
        }
      }
    }, "BatchedWriteAheadLog Writer")
    thread.setDaemon(true)
    thread.start()
    thread
  }

  /** Write all the records in the buffer to the write ahead log. */
  private def flushRecords(): Unit = {
    try {
      // 这里调用take会阻塞,一直到队列中有数据
      buffer += walWriteQueue.take()
      // 调用drainTo能够高效率的导出数据
      val numBatched = walWriteQueue.drainTo(buffer.asJava) + 1
    } catch {
      case _: InterruptedException =>
        logWarning("BatchedWriteAheadLog Writer queue interrupted.")
    }
    try {
      var segment: WriteAheadLogRecordHandle = null
      if (buffer.length > 0) {
        // 依照时间排序
        val sortedByTime = buffer.sortBy(_.time)
        // 取WAL列表中最后的时间,作为batch的结束时间
        val time = sortedByTime.last.time
        // wrappedLog是FileBasedWriteAheadLog类型,在初始化的时候指定
        // aggregate方法将这一批次的数据,汇合到一个ByteBuffer里,
        // 然后通过FileBasedWriteAheadLog写入数据
        segment = wrappedLog.write(aggregate(sortedByTime), time)
      }
      buffer.foreach(_.promise.success(segment))
    } catch {
      ......
    } finally {
      // 清空buffer列表  
      buffer.clear()
    }
  }
}

因为BatchedWriteAheadLog的单位是batch,所以它不支持单次数据的读取。

1
2
3
4
override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
  throw new UnsupportedOperationException("read() is not supported for BatchedWriteAheadLog " +
    "as the data may require de-aggregation.")
}

BatchedWriteAheadLog的删除操作,也只是转交给了FileBasedWriteAheadLog

1
2
3
override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
  wrappedLog.clean(threshTime, waitForCompletion)
}
updatedupdated2023-07-022023-07-02