Recently I had a requirement at work, which simply means that millions of timer tasks will be created in a short period of time, and the corresponding amounts will be added up when they are created to prevent overselling, and the data will need to be checked again after half an hour, and if the data does not match, the added amount will need to be subtracted back.
This is a low performance task if done with Go’s built-in Timer, which uses a minimal heap and has a time complexity of O(log n) for both creation and deletion. The performance is much better if you use the time wheel, which is O(1).
The use of the Timer wheel is actually very widespread, with components such as Netty, Akka, Quartz, ZooKeeper, Kafka and many others.
Introduction
Simple Time Wheel
Storing tasks in the time wheel is a torus queue, the underlying implementation is an array, each element of which can hold a list of timed tasks. The list of timed tasks is a ring-shaped bi-directional chain table, each item in the chain table represents a timed task item, which encapsulates the real timed task.
The time wheel consists of a number of time frames, each representing the basic time span (tickMs) of the current time wheel. The number of time frames in the wheel is fixed and can be expressed in terms of wheelSize, so the overall time span (interval) of the wheel can be calculated using the formula tickMs x wheelSize.
The wheel also has a dial pointer (currentTime), which indicates the current time of the wheel. currentTime is an integer multiple of tickMs. currentTime points to the time frame that is due, indicating all the tasks in the chain table corresponding to the time frame that needs to be processed.
The following diagram shows a time wheel with tickMs of 1s and wheelSize equal to 10, each cell contains a chain of timed tasks, and the chain contains the real task items.
Initially the dial pointer currentTime points to time frame 0. If the tickMs of the time wheel is 1ms and the wheelSize is equal to 10, then the interval is equal to 10s. The following diagram shows that a task timed at 2s is inserted and stored in the task chain with time frame 2, marked in red. As time passes, the pointer currentTime advances and if 2s has elapsed, then currentTime will point to time frame 2 and the task chain for this time frame will be retrieved for processing.
If the current pointer currentTime points to 2, then if a 9s task is inserted, the new task will take the original time grid chain and will be stored in time grid 1.
The time wheels described here are simple time wheels, with one layer and an overall time range between currentTime and currentTime+interval. If you now have a 15s timed task you need to reopen a time wheel and set a time wheel with a time span of at least 15s to be sufficient. But there is no bottom line to this expansion, if a 10,000 second time wheel is needed, then an array of this size is needed to store it, which not only takes up a lot of memory space, but also makes it less efficient to traverse such a large array.
This is why the concept of a hierarchical time wheel was introduced.
Tiered time wheel
The diagram shows a two-layer time wheel, the second layer also consists of 10 time frames, each spanning 10s. the tickMs of the second layer time wheel will be the interval of the first layer time wheel, i.e. 10s. the wheelSize of each layer time wheel is fixed at 10, so the overall time span interval of the second layer time wheel is 100s.
The diagram shows the expiry time range for each time frame, and we can clearly see that the expiry time range for the 0th time frame of the second time frame is [0,9]. That is, one time cell of the second time wheel can represent all (10) time cells of the first time wheel.
If a 15s task is added to this time wheel, then when the first time wheel cannot accommodate it, it enters the second time wheel and is inserted into the time frame with an expiry time of [10, 19].
As time passes, when there are 5s left in the original 15s task, there is a time wheel demotion operation here, at which point the overall time span of the first level time wheel is sufficient and this task is added to the time frame with an expiry time of 5 in the first level time wheel, after which another 5s are experienced before this task actually expires and the corresponding expiry operation is finally executed.
Code Implementation
As our Go language version of the TimingWheel code is modelled on Kafka, there are a few minor details in the implementation of the TimingWheel.
- Each chain in the time grid of the TimingWheel will have a root node for simplifying the boundary conditions. It is an additional chain table node which acts as the first node and which does not store anything in its value field, but is only introduced for the convenience of the operation.
- The start time (startMs) of all higher-level time wheels except the first one is set to the currentTime of the first wheel before it when this layer was created. currentTime for each layer must be an integer multiple of tickMs, and if it is not satisfied then currentTime is trimmed to an integer multiple of tickMs. The trimming method is: currentTime = startMs - (startMs % tickMs).
- Timers in Kafka need only hold a reference to the first level of the TimingWheel and do not hold other higher level TimingWheels directly, but each level of the TimingWheel will have a reference (overflowWheel) to a higher level of the application.
- The timer in Kafka uses a DelayQueue to help advance the time wheel. Each chain in the time grid used is added to the DelayQueue during operation. The DelayQueue is sorted by the expiration time corresponding to the time wheel, with the task with the shortest expiration being placed at the head of the DelayQueue queue, and the tasks that expire in the DelayQueue are fetched via a separate thread.
Structs
|
|
tick, wheelSize, interval, currentTime are all relatively well understood, the buckets field represents a list of time frames, queue is a delayed queue through which all tasks are triggered, and overflowWheel is a reference to the upper level time wheel.
The bucket actually encapsulates the task queue inside the time frame, which puts in tasks with the same expiry time, and the queue timers will be taken out for processing after expiry. An interesting point here is that since there will be multiple threads accessing the bucket concurrently, the atomic class is needed to get the int64 bit value, and 64 bit alignment is needed to ensure consistency in reading 64 bit data on 32 bit systems.
Timer is the smallest execution unit of the time wheel and is a wrapper around a timed task that will call task to execute the task when it expires.
Initializing the time wheel
For example, now initialise a time wheel with a tick of 1s and a wheelSize of 10.
|
|
The initialisation is very simple, you can just look at the code comments above.
Starting the time wheel
Here we look at the start method.
|
|
This method will start a goroutines to execute the incoming function asynchronously, you can see the source code at the link above.
The first goroutines is used to call the Poll method of the queue of the delayed queue, which will keep cycling through the data in the queue and put the expired data into the C pipe of the queue; the second goroutines will infinitely cycle through the data of C in the queue, and if there is data in C, it means it has expired. If there is data in C that indicates it has expired, then the advanceClock method will be called first to move the current time currentTime forward to the expiry time of the bucket, then the Flush method will be called to take out the queue in the bucket and the addOrRun method will be called to execute it.
|
|
The advanceClock method advances the time wheel by setting the currentTime from the due time.
|
|
The Flush method iterates through the list of timers in the bucket and inserts it into the ts array, then calls the reinsert method, in this case the addOrRun method.
addOrRun will call the add method to check if the incoming timer has expired, and if so, call the task method asynchronously to execute it directly. add method will be analysed below.
The entire start execution flow is shown in the diagram.
- the start method starts a goroutines call back to poll to process the data due in the DelayQueue and put the data into pipe C.
- the start method starts a second goroutines method that loops through the data in DelayQueue’s pipe C, which actually holds a bucket, and then iterates through the bucket’s list of timers, executing them asynchronously if the task is due, or putting them back into DelayQueue if it’s not.
add task
We add a 15s timed task via the AfterFunc method and if it expires then execute the function passed in.
The AfterFunc method calls the addOrRun method based on the task due time passed in and the function that needs to be executed when it expires. addOrRun method, which we have seen above, will determine if the timed task needs to be executed based on the due time.
Let’s look at the add method.
|
|
add method is divided into three parts according to the expiration time, the first part is less than the current time + tick, indicating that it has expired, then return false to execute the task can be.
The second part of the judgment will be based on whether the expiration is less than the span of the time wheel, if it is less than that, it means that the timed task can be put into the current time wheel, find the time frame corresponding to the buckets by taking the mode and put it into the bucket queue, the SetExpiration method will determine whether the delay queue has already been executed according to the parameters passed in, to prevent repeated insertion.
The third part indicates that the time span of the timed task has exceeded the current time wheel and needs to be escalated to the upper layer of the time wheel. Note that the tick of the time wheel of the upper level is the interval of the current time wheel, the delay queue is still the same, and then set to the pointer overflowWheel and call the add method to recurse to the upper level.
By now the time wheel is finished, but there are still things to note, we used the DelayQueue plus the ring queue in the implementation of the time wheel with the above time wheel. The TimingWheel time complexity is O(1) for the insertion and deletion operations of timed task items, and the queue in the DelayQueue uses a priority queue with a time complexity of O(log n), but since the buckets list is actually very small, this does not affect performance.