DEV Community

Cover image for Implement a timing wheel for millions of concurrent tasks.
Kevin Wan
Kevin Wan

Posted on

Implement a timing wheel for millions of concurrent tasks.

For systems that contain lots of delayed in-process tasks. If we use lots of timers to handle the tasks, there will be lots of idle goroutines and lots of memory consumed. Lots of gourtines also consume more CPU to schedule them.

This article introduces the TimingWheel in go-zero, which allows developers to schedule lots of delayed tasks. As for delayed tasks, two options are usually available.

  1. Timer, timers are used for one-off tasks. It represents a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time.
  2. TimingWheel, which maintains an array of task groups, and each slot maintains a chain of stored tasks. When execution starts, the timer executes the tasks in one slot at specified intervals.

Option 2 reduces the maintenance of tasks from priority queue O(nlog(n)) to bidirectional linked table O(1), and the execution of tasks also requires only polling for tasks at one point in time O(N), without putting in and removing elements O(nlog(n)), as in the case of the priority queue.

Let's look at our own use of TimingWheel in go-zero:

TimingWheel in cache

Let's start with the use of TimingWheel in the cache of collection.

timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
  key, ok := k.(string)
  if !ok {
    return
  }
  cache.Del(key)
})
if err ! = nil {
  return nil, err
}

cache.timingWheel = timingWheel
Enter fullscreen mode Exit fullscreen mode

This is the initialization of cache which also initializes TimingWheel to clean the expired key.

  • interval: time interval to check the tasks
  • numSlots: the number of time slots
  • execute: the function to process tasks

The execution function in cache is deleting the expired key, and this expiration calls are controlled by TimingWheel to proceed.

Initialization

// The initialization of TimingWheel
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
  *TimingWheel, error) {
  tw := &TimingWheel{
    interval: interval, // time frame interval
    ticker: ticker, // the ticker to trigger the execution
    slots: make([]*list.List, numSlots), // the slots to put tasks
    timers: NewSafeMap(), // map to store task with {key, value}
    tickedPos: numSlots - 1, // the previous ticked position
    execute: execute, // execute function
    numSlots: numSlots, // the number of slots
    setChannel: make(chan timingEntry), // the channel to set tasks
    moveChannel: make(chan baseEntry), // the channel to move tasks
    removeChannel: make(chan interface{}), // the channel to remove tasks
    drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks
    stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel
  }
  // Prepare all the lists stored in the slot
  tw.initSlots()
  // start asynchronous concurrent process, use channel for task communication and passing
  go tw.run()

  return tw, nil
}
Enter fullscreen mode Exit fullscreen mode

The above is a more visual representation of the "time wheel" of TimingWheel, and the details will be explained later around this diagram.

go tw.run() creates a goroutine to do the tick notification.

func (tw *TimingWheel) run() {
  for {
    select {
    // Timer does time push -> scanAndRunTasks()
    case <-tw.ticker.Chan():
      tw.onTick()
    // add task will input task to setChannel
    case task := <-tw.setChannel:
      tw.setTask(&task)
      ...
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, the timer execution starts at initialization and spins in the internal time slot, and then the bottom keeps fetching the task from the list in the slot and giving it to the execute execution.

Task Operation

The next step is to set the cache key.

func (c *Cache) Set(key string, value interface{}) {
  c.lock.
  _, ok := c.data[key]
  c.data[key] = value
  c.lruCache.add(key)
  c.lock.Unlock()

  expiry := c.unstableExpiry.AroundDuration(c.expiry)
  if ok {
    c.timingWheel.MoveTimer(key, expiry)
  } else {
    c.timingWheel.SetTimer(key, value, expiry)
  }
}
Enter fullscreen mode Exit fullscreen mode
  1. check if the key exists in the data map
  2. if it exists, update expire by calling MoveTimer()
  3. otherwise, set the key with expiry by calling SetTimer()

So the use of TimingWheel is clear, developers can add or update according to their needs.

Also, if we read the source code, we will find that SetTimer(), MoveTimer() is to send the task to the channel, and the task operation of the channel will be continuously taken out by the goroutine created in run().

SetTimer() -> setTask().

  • not exist task: getPostion -> pushBack to list -> setPosition
  • exist task: get from timers -> moveTask()

MoveTimer() -> moveTask()

From the above call chain, there is a function that is called by all: moveTask()

func (tw *TimingWheel) moveTask(task baseEntry) {
  // timers: Map => get [positionEntry "pos, task"] by key
  val, ok := tw.timers.Get(task.key)
  if !ok {
    return
  }

  timer := val.(*positionEntry)
  // {delay < interval} => delay is less than a time frame interval,
  // the task should be executed immediately
  if task.delay < tw.interval {
    threading.GoSafe(func() {
      tw.execute(timer.item.key, timer.item.value)
    })
    return
  }
  // If > interval, calculate the new pos, circle out of the time wheel by delaying the time
  pos, circle := tw.getPositionAndCircle(task.delay)
  if pos >= timer.pos {
    timer.item.circle = circle
    // Record the offset of the move
    timer.item.diff = pos - timer.pos
  } else if circle > 0 {
    // move to the level of (circle-1)
    circle --
    timer.item.circle = circle
    // because it's an array, add numSlots [which is the equivalent of going to the next level]
    timer.item.diff = tw.numSlots + pos - timer.pos
  } else {
    // If offset is ahead of schedule, task is still on the first level
    // mark the old task for deletion and requeue it for execution
    timer.item.removed = true
    newItem := &timingEntry{
      baseEntry: task,
      value: timer.item.value,
    }
    tw.slots[pos].PushBack(newItem)
    tw.setTimerPosition(pos, newItem)
  }
}
Enter fullscreen mode Exit fullscreen mode

The above process has the following cases.

  • delay < internal: because < single time precision, it means that this task needs to be executed immediately
  • delay for changes.
    • new >= old: <newPos, newCircle, diff>
    • newCircle > 0: compute diff and convert circle to the next level, so diff + numslots
    • If the delay is simply shortened, remove the old task marker, rejoin the list, and wait for the next round of loops to be executed

Execute

Previously in the initialization, the timer in run() keeps advancing, and the process of advancing is mainly to pass the task in the list to the execution of execute func. Let's start with the execution of the timer.

// Timer 'execute every internal'
func (tw *TimingWheel) onTick() {
  // update the current tick position on every execution
  tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  // Get the chain of tasks stored in the tick position at this time
  l := tw.slots[tw.tickedPos]
  tw.scanAndRunTasks(l)
}
Enter fullscreen mode Exit fullscreen mode

Immediately following this is how to execute execute.

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
  // store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute]
  var tasks []timingTask

  for e := l.Front(); e ! = nil; {
    task := e.Value.(*timingEntry)
    // mark for deletion, do the real deletion in scan, delete data from map
    if task.removed {
      next := e.Next()
      l.Remove(e)
      tw.timers.Del(task.key)
      e = next
      continue
    } else if task.circle > 0 {
      // the current execution point has expired, but at the same time it's not in the first level,
      // so even though the current level is done, it drops to the next level
      // but it doesn't modify pos
      task.circle--
      e = e.Next()
      continue
    } else if task.diff > 0 {
      // because the diff has already been marked, it needs to go into the queue again
      next := e.Next()
      l.Remove(e)
      pos := (tw.tickedPos + task.diff) % tw.numSlots
      tw.slots[pos].PushBack(task)
      tw.setTimerPosition(pos, task)
      task.diff = 0
      e = next
      continue
    }
    // the above cases are all non-executable cases, those that can be executed will be added to tasks
    tasks = append(tasks, timingTask{
      key: task.key,
      value: task.value,
    })
    next := e.Next()
    l.Remove(e)
    tw.timers.Del(task.key)
    e = next
  }
  // for range tasks, then just execute each task->execute
  tw.runTasks(tasks)
}
Enter fullscreen mode Exit fullscreen mode

The specific branching situation is explained in the comments, which can be combined with the previous moveTask(), where circle descends and diff computation is the focus of the associated two functions.

As for the diff calculation, it involves the calculation of pos, circle.

// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
// step = 15, pos = 14, circle = 0
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
  steps := int(d / tw.interval)
  pos = (tw.tickedPos + steps) % tw.numSlots
  circle = (steps - 1) / tw.numSlots
  return
}
Enter fullscreen mode Exit fullscreen mode

The above process can be simplified to the following.

steps = d / interval
pos = step % numSlots - 1
circle = (step - 1) / numSlots

Summarize

  1. TimingWheel relies on the timer to drive the time forward while taking out the tasks from the doubly linked list in the current time frame and passing them to execute for execution. Because it is driven by internal fixed time step, there may be: a 60s task, internal = 1s, so it will run 59 times noop.

  2. in the expansion time, take circle layering, so that you can constantly reuse the original numSlots, because the timer is constantly loop through circle by circle. Any number of tasks can be put into the fixed size of slots. This design can break the long time limit without creating additional data structures.

Also in go-zero there are many practical toolkits, using them for improving service performance and development efficiency.

Project address

https://github.com/zeromicro/go-zero

Welcome to use go-zero and star to support us!

Discussion (5)

Collapse
marcello_h profile image
Marcelloh

why not work with a pool of go-routines (and perhaps a queue to manage them)?

Collapse
kevwan profile image
Kevin Wan Author

It’s customizable by using different execute functions.

Collapse
marcello_h profile image
Marcelloh

I think the idea is nice but a controller, queue and pool will do the trick too, and is perhaps more maintainable.

Thread Thread
kevwan profile image
Kevin Wan Author

The tasks are delay time dependent. Would like to know how to handle it with queue?

Thread Thread
marcello_h profile image
Marcelloh

Some queues have timing inside, but you can make your own queue which has that. (or even multiple ones, for the various tasks)