Kafka 延迟任务

Kafka 延迟任务

时间轮概念

Kafka在处理请求时,使用了多种延迟任务来处理,比如心跳请求。Kafka自己实现了时间轮,提供任务的延迟定时。关于时间轮的概念,我们把常见的钟表想象成一个时间轮。

时间轮都有着自己的定时范围,以时钟的时针为例,指针每前进一格,代表着时间过去一个小时。它最多有12格,能表示最大延迟时间不超过12个小时的任务。

时间轮可以分等级的,子时间轮的最大延迟时间,刚好为父时间轮的一格。类似于钟表一样。钟表有三个指针,时针,分针,秒针。分钟时间轮能表示的最大延迟时间为60分钟,刚好为小时时间轮的一格。

多级时间轮的精确时间是取决于最下层的时间轮。以钟表为例,它最下层的时间轮为秒针时间轮,精确的时间单位是秒。

任务列表

时间轮的每一格,都保存了对应时间的延迟任务列表。当向时间轮添加任务时,会根据任务的延迟时间,放到不同的时间轮里。比如当前时间是1点钟,现在添加一个延迟任务,它需要延迟1分钟1秒,添加的步骤如下:

  1. 首先试图添加到秒时间轮里,但是秒时间轮最大的延迟时间是60秒,超过了最大延迟范围,所以会将任务尝试添加到分钟时间轮
  2. 因为分钟时间轮的最大范围是60分钟,没有超过分钟时间轮的范围,所以任务添加到分钟时间轮的第二格。

当分针前进一格,到达1点1分,它会检测下一格的任务,会将该时间块的任务列表,取出来添加到秒时间轮。比如刚刚的延迟1分钟1秒的任务,会将它添加秒时间轮的二格。

当时间到达1点1分1秒,秒时间轮会执行该时间块的任务。

相关类介绍

TimerTask类表示延迟任务,继承Runnable,子类需要实现run方法。

TimerTaskEntry类表示链表项,它封装了TimerTask。

TimerTaskList表示延迟任务链表,它支持链表的添加和删除操作。它还提供了flush方法,支持执行延迟任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
    
  private[this] val root = new TimerTaskEntry(null, -1)

  // 传递的函数,用来执行任务
  def flush(f: (TimerTaskEntry)=>Unit): Unit = {
    synchronized {
      // 遍历链表,执行延迟任务
      var head = root.next
      while (head ne root) {
        remove(head)
        f(head)
        head = root.next
      }
      expiration.set(-1L)
    }
  }
}

TimingWheel表示时间轮,它有wheelSize格时间块,每块的时间长度为tickMs。每块时间保存了TimerTaskList。

当添加任务时,任务超过了当前时间轮的范围,TimingWheel会自动创建父时间轮,直到父时间轮的范围可以包含此任务。

我们可以看到TimingWheel仍然使用了Java的DelayQueue类,实现定时作用。既然使用了DelayQueue,那为什么还会用到时间轮。根本原因就是性能,DelayQueue添加任务的复杂度是复杂度是O( n log(n) ),如果任务量太多,那么DelayQueue的性能会不好。Kafka引用了时间轮,使得添加任务的复杂度降低到了O(1),不过它将时间相差不大的任务,都添加到了一个队列里,这样就降低了时间精确性。

TimingWheel提供add方法添加任务,还提供了advanceClock方法更新时间。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// tickMs表示时间轮的时间单位,比如分钟时间轮的时间单位为1分钟
// wheelSize表示有多少格时间,比如分钟时间轮有60格
// startMs表示时间轮的开始时间
// queue是Java自带类DelayQueue类型,它只保存了任务队列
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  // interval表示延迟的最大时间,比如分钟时间轮为60分钟
  private[this] val interval = tickMs * wheelSize
  // 每格时间都对应着一个任务队列
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  // 当前时间,它的时间单位为tickMs,这里采用向下取整
  private[this] var currentTime = startMs - (startMs % tickMs)
  // 父时间轮
  @volatile private[this] var overflowWheel: TimingWheel = null

  private[this] def addOverflowWheel(): Unit = {
    synchronized {
      if (overflowWheel == null) {
        overflowWheel = new TimingWheel(
          tickMs = interval,          // 父时间轮的时间单位,为当前时间轮的最大时间
          wheelSize = wheelSize,      // 父时间轮的格数相同
          startMs = currentTime,
          taskCounter = taskCounter,
          queue
        )
      }
    }
  }
    
  // 更新当前时间
  def advanceClock(timeMs: Long): Unit = {
    // 只有过了单位时间,才会更新当前时间
    if (timeMs >= currentTime + tickMs) {
      currentTime = timeMs - (timeMs % tickMs)
      // 更新父时间轮
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
    
  // 添加任务,返回是否成功
  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs
    if (timerTaskEntry.cancelled) {
      // 如果任务在添加之前,已经被取消掉了
      false
    } else if (expiration < currentTime + tickMs) {
      // 如果任务已经过期了
      false
    } else if (expiration < currentTime + interval) {
      // 计算添加到哪个时间格
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      // 添加任务到对应的任务列表
      bucket.add(timerTaskEntry)

      // 设置任务列表的过期时间
      if (bucket.setExpiration(virtualId * tickMs)) {
        // 将列表添加到DepalyQueue
        queue.offer(bucket)
      }
      true
    } else {
      // 超出了最大延迟时间,需要添加到父时间轮
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }    
}

定时器

时间轮只是保存了任务,而定时器负责管理时间轮和执行过期的任务。 定时器的接口由Timer表示

1
2
3
4
5
6
7
trait Timer {
  // 添加任务
  def add(timerTask: TimerTask): Unit

  // 更新时间,并且提交延迟任务给线程池执行
  def advanceClock(timeoutMs: Long): Boolean
}

SystemTimer实现了Timer接口,它使用DelayQueue查找过期的任务列表,然后提交线程池执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {

  // 执行任务的线程池
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
    def newThread(runnable: Runnable): Thread =
      KafkaThread.nonDaemon("executor-"+executorName, runnable)
  })
  // Java自带的延迟队列
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  // 时间轮
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,
    wheelSize = wheelSize,
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )
  
  // 添加任务
  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
    } finally {
      readLock.unlock()
    }
  }
  
    
  def advanceClock(timeoutMs: Long): Boolean = {
    // delayQueue查找过期的任务队列
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
          // 调用时间轮的advanceClock方法,更新该时间轮的时间
          timingWheel.advanceClock(bucket.getExpiration())
          // 对任务列表依次执行reinsert操作
          bucket.flush(reinsert)
          // 继续查看是否还有超时的任务列表
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
  
  private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
  
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    
    if (!timingWheel.add(timerTaskEntry)) {
      // 如果是因为任务过期,导致添加失败,那么将任务丢到线程池执行
      if (!timerTaskEntry.cancelled)
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }
  

延迟任务

DelayedOperation表示延迟任务,它在TimerTask的基础上,提供了支持提前完成的功能。

子类需要实现DelayedOperation的两个重要回调,onComplete 和 onExpire,对应着不同情形的回调

  • 当任务提前完成时,只会调用onComplete方法。
  • 当任务因为到期才执行,会调用onComplete方法和onExpire方法

DelayedOperation提供了tryComplete方法,供使用者调用,来尝试提前完成任务。子类需要实现这个方法,判断是否满足提前完成条件,如果满足则执行forceComplete方法执行任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
abstract class DelayedOperation {
  
  // 这里使用了AtomicBoolean,用来控制并发
  private val completed = new AtomicBoolean(false)
    
  // 执行任务
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // 尝试设置completed的值
      // 从定时器中取消任务
      cancel()
      // 调用onComplete方法,执行回调
      onComplete()
      true
    } else {
      false
    }
  }    
}

DelayedOperation还提供了maybeTryComplete方法,在tryComplete方法的基础之上,提供了多线程的优化。maybeTryComplete方法实现得很精巧,它能保证尽量及时的检测任务是否可以完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 是否需要再次尝试
private val tryCompletePending = new AtomicBoolean(false)

private[server] def maybeTryComplete(): Boolean = {
  var retry = false
  var done = false
  do {
    if (lock.tryLock()) {
      // 成功获取锁
      try {
        // 设置tryCompletePending为false,表示不再尝试
        tryCompletePending.set(false)
        
        done = tryComplete()
      } finally {
        lock.unlock()
      }
      // 获取tryCompletePending的值,有可能此时外部线程修改了值
      retry = tryCompletePending.get()
    } else {
      // 如果获取锁失败,设置tryCompletePending为true,通知获取锁的线程再次尝试。
      // 如果tryCompletePending之前为false,表示获取所的线程尝试操作已完成,不能保证。获取所失败的线程,需要自己尝试
      // 如果tryCompletePending之前为true,表示现在已有一个获取锁失败的线程在运行,所以当前线程不用再尝试
      retry = !tryCompletePending.getAndSet(true)
      
    }
  } while (!isCompleted && retry)
  done
}

maybeTryComplete方法实现得很精巧,它能保证尽量及时的检测任务是否可以完成。如果线程A首先获取锁,但是这时没有满足条件。之后线程B获取锁失败,但是此时说不定满足条件,所以这里需要再次检查条件,至于是哪个线程执行都可以。

延迟任务管理

DelayedOperationPurgatory负责管理延迟任务,支持任务分组。分组信息由Watchers类表示,它包含了延迟任务列表和任务类型。Watchers提供了tryCompleteWatched方法,会尝试完成列表中的任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// key为任务类型
private class Watchers(val key: Any) {
    // 延迟任务列表
    private[this] val operations = new ConcurrentLinkedQueue[T]()
    
    def tryCompleteWatched(): Int = {
      var completed = 0
      // 遍历任务列表
      val iter = operations.iterator()
      while (iter.hasNext) {
        val curr = iter.next()
        if (curr.isCompleted) {
          // another thread has completed this operation, just remove it
          iter.remove()
        } else if (curr.maybeTryComplete()) {
          // 调用DelayedOperation的maybeTryComplete方法,尝试完成任务
          iter.remove()
          completed += 1
        }
      }

      if (operations.isEmpty)
        // Watchers是DelayedOperationPurgatory的内部类,这里的removeKeyIfEmpty是属于DelayedOperationPurgatory类的方法
        // 如果当前列表的任务都已经完成,那么将这个分组删除掉
        removeKeyIfEmpty(key, this)
      completed
    }
}

DelayedOperationPurgatory里包含了一个线程,用来更新时间轮的时间,并且执行过期任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final class DelayedOperationPurgatory[T <: DelayedOperation](...) {
  // 更新时间的线程
  private val expirationReaper = new ExpiredOperationReaper()
  
  private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
    false) {
      
    // 该线程调用DelayedOperationPurgatory的advanceClock方法,更新时间轮
    override def doWork() {
      advanceClock(200L)
    }
  }
  
  // timeoutMs参数,表示此次操作的超时时间
  def advanceClock(timeoutMs: Long) {
    // 调用SystemTimer的advanceClock方法,执行过期的任务
    timeoutTimer.advanceClock(timeoutMs)
    // estimatedTotalOperations表示 DelayedOperationPurgatory的任务数,包含已经完成的任务
    // delayed表示时间轮还未完成的任务数
    if (estimatedTotalOperations.get - delayed > purgeInterval) {
      estimatedTotalOperations.getAndSet(delayed)
      // 遍历Watchers列表,清除已经完成的任务
      val purged = allWatchers.map(_.purgeCompleted()).sum
    }
  }  
  
}

DelayedOperationPurgatory提供了两个重要方法,供外部使用。使用者首先调用tryCompleteElseWatch方法,添加延迟任务。添加完后,不定时的调用checkAndComplete方法,尝试提前完成任务。

  • tryCompleteElseWatch方法,提供添加延迟任务
  • checkAndComplete方法,负责尝试提前完成任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final class DelayedOperationPurgatory[T <: DelayedOperation] (...) {

  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

  // 注意到watchKeys是一个列表,延迟任务与事件类型是多对多的关系
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    // 尝试提前完成任务
    var isCompletedByMe = operation.tryComplete()
    if (isCompletedByMe)
      return true

    var watchCreated = false
    for(key <- watchKeys) {
      // 有可能别的线程,提前完成了任务
      if (operation.isCompleted)
        return false
      // 将任务添加到key对应的列表
      watchForOperation(key, operation)

      if (!watchCreated) {
        watchCreated = true
        estimatedTotalOperations.incrementAndGet()
      }
    }
      
    // 再次尝试完成任务
    isCompletedByMe = operation.maybeTryComplete()
    if (isCompletedByMe)
      return true

    if (!operation.isCompleted) {
      if (timerEnabled)
        // 添加到定时器中
        timeoutTimer.add(operation)
      if (operation.isCompleted) {
        // 如果这时候任务已经完成,那么就取消任务
        operation.cancel()
      }
    }
    false
  }

  def checkAndComplete(key: Any): Int = {
    // 获取该事件类型的任务列表
    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
    if(watchers == null)
      0
    else
      // 调用Watchers的tryCompleteWatched方法,尝试提前完成任务
      watchers.tryCompleteWatched()
  }  
}
updatedupdated2023-07-022023-07-02