Each component of Kubernetes has certain timed tasks, such as log processing, querying of tasks, cache usage, etc. Timed tasks in Kubernetes are implemented through wait packages, such as starting a probe check in a Kubelet.
1
2
3
4
5
6
7
|
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
|
Golang’s Timed Tasks
Before we talk about the Kubernetes wait library, let’s take a look at how Golang should implement a timed task.
The time library in Golang contains a lot of time-related tools, including the Ticker and Timer timers.
The Ticker will trigger at a fixed interval as soon as it is defined, starting with a timer and requiring no other action.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package main
import (
"fmt"
"time"
)
func main() {
d := time.Duration(time.Second * 5)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
<-ticker.C
fmt.Println("Hello World")
}
}
|
And for Timer, it needs to be reset after the timeout to continue triggering.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"fmt"
"time"
)
func main() {
d := time.Duration(time.Second * 5)
timer := time.NewTimer(d)
defer timer.Stop()
for {
<-timer.C
fmt.Println("Hello World")
timer.Reset(d)
}
}
|
Note. Regardless of the timer, .C
is a unidirectional channel of type chan Time
with a capacity of 1. It will be blocked when there is more than 1 data, so that it will not be triggered more than once.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
|
Kubernetes’ wait library
Common APIs
The wait library implements a variety of common APIs to provide the ability to execute functions at regular intervals.
executes a function periodically and never stops
1
2
3
4
|
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)
|
This function supports one function argument and one interval, and the function will be executed periodically without stopping.
executes a function periodically and can accept a stop signal
1
2
3
4
5
6
|
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{})
|
This function supports providing a function, an interval, and a channel where the stop signal occurs, similar to Forever, but can be stopped by posting a message to stopCh.
periodically checks for prerequisites
1
2
3
4
5
6
7
8
9
10
11
|
// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error
|
This function will periodically check if the condition is checked successfully at interval intervals.
periodically checks for preconditions until the check is successful or stops
1
2
3
4
5
6
|
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
|
Core code
The wait library’s timed task API is based on the JitterUntil
implementation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
return
case <-t.C:
sawTimeout = true
}
}
}
|
The 5 parameters of JitterUntil
.
parameter name |
type |
role |
f |
func() |
a logical function to be executed at regular intervals |
period |
time.Duration |
the time interval of the timed task |
jitterFactor |
float64 |
if greater than 0.0 the interval becomes a random value from duration to duration + maxFactor * duration |
sliding |
bool |
whether the execution time of the logic is not counted in the interval |
stopCh |
<-chan struct{} |
the channel that receives the stop signal |