Generic Queues
In kubernetes, using go’s channel can’t satisfy kubernetes application scenarios, such as delaying, rate-limiting, etc.; there are three kinds of queues in kubernetes: common queue
, delaying queue
, and rate limiters queue
.
Inferface
Interface is defined as an abstraction of all queues.
1
2
3
4
5
6
7
8
|
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
|
Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type Type struct { // 一个work queue
queue []t // queue用slice做存储
dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
processing set // 当前正在处理的元素集合
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素
|
You can see that the core properties are queue
, dirty
, processing
.
Delay queue
Before looking at priority queues, you need to have some understanding of Heap
, because delay queue uses heap
for delay queues.
Heap
Heap
is a special data structure based on the tree property; heap is a fully binary tree type with two types.
- For example, if B is a child node of A, $key(A) \geq key(B)$. This means that the element with the maximum Key is always located at the root node, and this type of Heap is called MaxHeap.
- A parent node with a value less than or equal to the value of its left and right children is called MinHeap.
Storage rules for a binomial heap.
- Each node contains elements that are greater than or equal to the elements of the node’s children.
- The tree is a complete binary tree.
So which of the following images is the heap?
Implementation of heap
Example: The process of adding an element with the value 42 to the left
Step 1: Put the new element into the first available position in the heap. This will keep the structure as a complete binary tree, but it may no longer be a heap, as the new element may have a larger value than its parent.
Step 2: If the value of the new element is greater than the parent element, swap the new element with the parent element until the new element reaches the root, or the new element is greater than or equal to the value of its parent element will stop.
This process is called reheapification upward.
Example: Removing the root
Step 1: Copy the root element to the variable used to return the value, copy the last element at the deepest level to the root, and then remove the last node from the tree. This element is called out-of-place
.
Step 2: While swapping the dissimilar element with the child of its maximum value and returning the value saved in step 1.
This process is called reheapification downward.
Priority queue
Behavior of the priority queue.
- Elements are placed in the queue and then taken out.
- Each element in the priority queue has a number associated with it, called the priority.
- When an element leaves the priority queue, the element with the highest priority leaves first.
How it is implemented.
- In the priority queue, each node of the heap contains an element along with the element’s priority, and maintains the tree so that it follows the heap storage rules for comparing nodes using the element’s priority:.
- Each node contains an element whose priority is greater than or equal to the priority of the node’s child elements.
- The tree is a complete binary tree.
- Implemented code: golang priorityQueue
Reference heap
Client-go’s delaying queue
The design of delaying queue
in Kubernetes is beautifully done by using a delayed queue implemented by heap
, together with a pass-through queue in kubernetes.
1
2
3
4
5
|
// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
Interface // 继承了workqueue的功能
AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}
|
Specifically implements an instance of DelayingInterface
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
type delayingType struct {
Interface // 通用的queue
clock clock.Clock // 对比的时间 ,包含一些定时器的功能
type Clock interface {
PassiveClock
type PassiveClock interface {
Now() time.Time
Since(time.Time) time.Duration
}
After(time.Duration) <-chan time.Time
NewTimer(time.Duration) Timer
Sleep(time.Duration)
NewTicker(time.Duration) Ticker
}
stopCh chan struct{} // 停止loop
stopOnce sync.Once // 保证退出只会触发一次
heartbeat clock.Ticker // 一个定时器,保证了loop的最大空事件等待时间
waitingForAddCh chan *waitFor // 普通的chan,用来接收数据插入到延迟队列中
metrics retryMetrics // 重试的指数
}
|
Then the whole data structure of the delay queue is shown in the figure below.
And as mentioned in the above section, the core of this delay queue is a priority queue, which in turn needs to satisfy.
- Each element in the priority queue has an associated number, called the priority.
- When an element leaves the priority queue, the element with the highest priority is the first to leave.
And waitFor
is the data structure of this priority queue
1
2
3
4
5
|
type waitFor struct {
data t // 数据
readyAt time.Time // 加入工作队列的时间
index int // 优先级队列中的索引
}
|
The waitForPriorityQueue
is an implementation of container/heap/heap.go.Inferface
, whose data structure is a MinHeap
that keeps the minimum readyAt
at Root.
1
2
3
4
5
|
type Interface interface {
sort.Interface
Push(x interface{}) // add x as element Len()
Pop() interface{} // remove and return element Len() - 1.
}
|
And the implementation of this is waitForPriorityQueue
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// 这个也是最重要的一个,就是哪个属性是排序的关键,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// push 和pop 必须使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
|
The core of the whole delay queue is waitingLoop
, which serves as the main logic of the delay queue, checking waitingForAddCh
for content to be delayed, taking out the delayed content and placing it in Heap
; and ensuring the maximum blocking period.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time) // 作为占位符
var nextReadyAtTimer clock.Timer // 最近一个任务要执行的定时器
waitingForQueue := &waitForPriorityQueue{} // 优先级队列,heap
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{} // 检查是否反复添加
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break // 时间没到则不处理
}
entry = heap.Pop(waitingForQueue).(*waitFor) // 从优先级队列中取出一个
q.Add(entry.data) // 添加到延迟队列中
delete(waitingEntryByData, entry.data) // 删除map表中的数据
}
// 如果存在数据则设置最近一个内容要执行的定时器
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh: // 退出
return
case <-q.heartbeat.C(): // 多久没有任何动作时重新一次循环
case <-nextReadyAt: // 如果有元素时间到了,则继续执行循环,处理上面添加的操作
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) { // 时间没到,是用readyAt和now对比time.Now
// 添加到延迟队列中,有两个 waitingEntryByData waitingForQueue
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false // 保证可以取完q.waitingForAddCh // addafter
for !drained {
select {
// 这里是一个有buffer的队列,需要保障这个队列读完
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default: // 保证可以退出,但限制于上一个分支的0~n的读取
// 如果上一个分支阻塞,则为没有数据就是取尽了,走到这个分支
// 如果上个分支不阻塞则读取到上个分支阻塞为止,代表阻塞,则走default退出
drained = true
}
}
}
}
}
|
Rate Limiting Queue
The rate-limiting queue RateLimiting
is a queue in which the priority queue is an extension of the delay queue.
1
2
3
4
5
6
7
8
9
|
type RateLimitingInterface interface {
DelayingInterface // 继承延迟队列
// 在限速器准备完成后(即合规后)添加条目到队列中
AddRateLimited(item interface{})
// drop掉条目,无论成功或失败
Forget(item interface{})
// 被重新放入队列中的次数
NumRequeues(item interface{}) int
}
|
You can see that the abstraction of a limited queue corresponds to a delayed queue as long as it satisfies AddRateLimited()
, Forget()
, NumRequeues()
are all limited queues. After looking at and understanding the rules, the specific implementation needs to be analyzed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
|
rateLimitingType
is an implementation of the abstract specification RateLimitingInterface
, which can be seen by adding a rate limiter RateLimiter
to the delay queue.
1
2
3
4
5
6
7
8
9
|
type RateLimiter interface {
// when决定等待多长时间
When(item interface{}) time.Duration
// drop掉item
// or for success, we'll stop tracking it
Forget(item interface{})
// 重新加入队列中的次数
NumRequeues(item interface{}) int
}
|
The abstract speed limiters are implemented as BucketRateLimiter
, ItemBucketRateLimiter
, ItemExponentialFailureRateLimiter
, ItemFastSlowRateLimiter
, MaxOfRateLimiter
, the following analysis of these rate-limiters
BucketRateLimiter
BucketRateLimiter
is a token bucket that implements rate.Limiter
with the abstract RateLimiter
. Initialization is done with workqueueDefault.ControllerRateLimiter()
.
1
2
3
4
5
6
7
|
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
|
ItemBucketRateLimiter
The ItemBucketRateLimiter
is an implementation that stores each token bucket as a list, with each key being a separate limiter.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type ItemBucketRateLimiter struct {
r rate.Limit
burst int
limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
return &ItemBucketRateLimiter{
r: r,
burst: burst,
limiters: make(map[interface{}]*rate.Limiter),
}
}
|
ItemExponentialFailureRateLimiter
As the name knows ItemExponentialFailureRateLimiter
limiter is an error index limiter, according to the number of errors, the index is used for the length of delay. You can see that When absolutely determines the delay time of traffic shaping, according to the number of errors for the index to extend the retry time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int // 失败的次数
baseDelay time.Duration // 延迟基数
maxDelay time.Duration // 最大延迟
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
|
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter
, the limiter retries a certain number of times quickly and then slowly.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int // 最大尝试次数
fastDelay time.Duration // 快的速度
slowDelay time.Duration // 慢的速度
}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
// 当错误次数没超过快速的阈值使用快速,否则使用慢速
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
|
MaxOfRateLimiter
MaxOfRateLimiter
returns the limiter with the largest delay in the limiter list.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// 找到列表內所有的NumRequeues(失败的次数),以最多次的为主。
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
|
How to use Kubernetes’ rate-limiter
Containers for rate-limited queues based on traffic control that can be bursted in large numbers but needs to be shaped and the add operation will be added based on the time to wait as designed in When()
. Different ways of delay are implemented depending on the queue.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package main
import (
"fmt"
"log"
"strconv"
"time"
"k8s.io/client-go/util/workqueue"
)
func main() {
stopCh := make(chan string)
timeLayout := "2006-01-02:15:04:05.0000"
limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
length := 20 // 一共请求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string) {
item := "Task-" + taskId + time.Now().Format(timeLayout)
log.Println(item + " Added.")
limiter.AddRateLimited(item) // 添加会根据When() 延迟添加到工作队列中
}(strconv.FormatInt(int64(i), 10), chs[i])
go func() {
for {
key, quit := limiter.Get()
if quit {
return
}
log.Println(fmt.Sprintf("%s process done", key))
defer limiter.Done(key)
}
}()
}
<-stopCh
}
|
Because the default speed limiter does not support initialized QPS, modify the one within the source code to $BT(1, 5)$, and the execution result can be seen that when the number of tokens in the bucket is exceeded during large bursts of traffic, it will be released based on the speed of token generation.
In the figure, the tasks are added in bursts and the logs are printed to be added at the same time, but the logs are output before the addition, and the consumer side can see that they are actually delayed. The configuration is one token per second, and the release traffic is actually one token per second.