Each component of Kubernetes has certain timed tasks, such as log processing, querying of tasks, cache usage, etc. Timed tasks in Kubernetes are implemented through wait packages, such as starting a probe check in a Kubelet.

1
2
3
4
5
6
7
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
	// Start syncing readiness.
	go wait.Forever(m.updateReadiness, 0)
	// Start syncing startup.
	go wait.Forever(m.updateStartup, 0)
}

Golang’s Timed Tasks

Before we talk about the Kubernetes wait library, let’s take a look at how Golang should implement a timed task.

The time library in Golang contains a lot of time-related tools, including the Ticker and Timer timers.

The Ticker will trigger at a fixed interval as soon as it is defined, starting with a timer and requiring no other action.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"fmt"
	"time"
)

func main() {

	d := time.Duration(time.Second * 5)
	ticker := time.NewTicker(d)

	defer ticker.Stop()

	for {
		<-ticker.C

		fmt.Println("Hello World")
	}

}

And for Timer, it needs to be reset after the timeout to continue triggering.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
	"time"
)

func main() {

	d := time.Duration(time.Second * 5)
	timer := time.NewTimer(d)

	defer timer.Stop()

	for {
		<-timer.C

		fmt.Println("Hello World")
		timer.Reset(d)
	}

}

Note. Regardless of the timer, .C is a unidirectional channel of type chan Time with a capacity of 1. It will be blocked when there is more than 1 data, so that it will not be triggered more than once.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

Kubernetes’ wait library

Common APIs

The wait library implements a variety of common APIs to provide the ability to execute functions at regular intervals.

executes a function periodically and never stops

1
2
3
4
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)

This function supports one function argument and one interval, and the function will be executed periodically without stopping.

executes a function periodically and can accept a stop signal

1
2
3
4
5
6
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 

This function supports providing a function, an interval, and a channel where the stop signal occurs, similar to Forever, but can be stopped by posting a message to stopCh.

periodically checks for prerequisites

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

This function will periodically check if the condition is checked successfully at interval intervals.

periodically checks for preconditions until the check is successful or stops

1
2
3
4
5
6
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

Core code

The wait library’s timed task API is based on the JitterUntil implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
	var t *time.Timer
	var sawTimeout bool

	for {
		select {
		case <-stopCh:
			return
		default:
		}

		jitteredPeriod := period
		if jitterFactor > 0.0 {
			jitteredPeriod = Jitter(period, jitterFactor)
		}

		if !sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

		func() {
			defer runtime.HandleCrash()
			f()
		}()

		if sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

		// NOTE: b/c there is no priority selection in golang
		// it is possible for this to race, meaning we could
		// trigger t.C and stopCh, and t.C select falls through.
		// In order to mitigate we re-check stopCh at the beginning
		// of every loop to prevent extra executions of f().
		select {
		case <-stopCh:
			return
		case <-t.C:
			sawTimeout = true
		}
	}
}

The 5 parameters of JitterUntil.

parameter name type role
f func() a logical function to be executed at regular intervals
period time.Duration the time interval of the timed task
jitterFactor float64 if greater than 0.0 the interval becomes a random value from duration to duration + maxFactor * duration
sliding bool whether the execution time of the logic is not counted in the interval
stopCh <-chan struct{} the channel that receives the stop signal