Overview
In Kubernetes, kube-controller-manager
, kube-scheduler
, and the underlying implementation of controller-rumtime
using Operator
all support leader election in highly available systems. This article will focus on understanding how the leader election in controller-rumtime
(the underlying implementation is client-go
) is implemented in the kubernetes controller.
Background
When running kube-controller-manager
, there are some parameters provided to cm for leader election, you can refer to the official documentation parameters to understand the parameters.
1
2
3
4
5
6
7
|
--leader-elect Default: true
--leader-elect-renew-deadline duration Default: 10s
--leader-elect-resource-lock string Default: "leases"
--leader-elect-resource-name string Default: "kube-controller-manager"
--leader-elect-resource-namespace string Default: "kube-system"
--leader-elect-retry-period duration Default: 2s
...
|
I thought the election of these components was done through etcd, but when I learned about controller-runtime
, I found that there was no configuration of the parameters related to etcd, which raised my curiosity about the election mechanism. With this curiosity, I searched for the kubernetes election and found that the official website describes it this way. simple leader election with kubernetes
From reading the article, we know that the kubernetes API provides an election mechanism that can be implemented for any container running in the cluster.
The Kubernetes API provides two properties to accomplish the election action
- ResourceVersions: Each API object has a unique ResourceVersion.
- Annotations: each API object can be annotated with these keys
Note: This kind of election will increase the pressure on the APIServer. It will also have an impact on etcd
So with this information, let’s look at who is the cm leader in the Kubernetes cluster (we provide a cluster with only one node, so this node is the leader).
All services in Kubernetes with leader election enabled will generate an EndPoint
, and in this EndPoint
will be the label (Annotations) mentioned above to identify who is the leader.
1
2
3
4
5
|
$ kubectl get ep -n kube-system
NAME ENDPOINTS AGE
kube-controller-manager <none> 3d4h
kube-dns 3d4h
kube-scheduler <none> 3d4h
|
Here’s an example of kube-controller-manager to see what information is available in this EndPoint.
1
2
3
4
5
6
7
8
9
10
11
12
|
[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-system
Name: kube-controller-manager
Namespace: kube-system
Labels: <none>
Annotations: control-plane.alpha.kubernetes.io/leader:
{"holderIdentity":"master-machine_06730140-a503-487d-850b-1fe1619f1fe1","leaseDurationSeconds":15,"acquireTime":"2022-06-27T15:30:46Z","re...
Subsets:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal LeaderElection 2d22h kube-controller-manager master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader
Normal LeaderElection 9m kube-controller-manager master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader
|
Take kube-controller-manager
as an example, and look at this Annotations: control-plane.alpha.kubernetes.io/leader:
which identifies which node is the leader.
election in controller-runtime
controller-runtime
section on leader election is in pkg/leaderelection, 100 lines of code in total, let’s see what’s done.
As you can see, only some options for creating resource locks are provided here EndPoint
What is the information
1
2
3
4
5
6
7
8
9
10
|
type Options struct {
// 在manager启动时,决定是否进行选举
LeaderElection bool
// 使用那种资源锁 默认为租用 lease
LeaderElectionResourceLock string
// 选举发生的名称空间
LeaderElectionNamespace string
// 该属性将决定持有leader锁资源的名称
LeaderElectionID string
}
|
As you can see by NewResourceLock
, this is going under client-go/tools/leaderelection below, and this leaderelection also has an example to learn how to use it.
As you can see by the example, the entry point to the election is a RunOrDie() function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// 这里填写你的代码,
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// 这里清理你的lease
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
|
Here we understand the concept of lock and how to start a lock, here is a look, client-go provides those locks.
In the code tools/leaderelection/resourcelock/interface.go defines a lock abstraction, interface provides a generic interface for locking resources used in a leader election.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type Interface interface {
// Get 返回选举记录
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create 创建一个LeaderElectionRecord
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)
// Identity 返回锁的标识
Identity() string
// Describe is used to convert details on current resource lock into a string
Describe() string
}
|
Then the implementation of this abstract interface is that the implementation of the resource lock, we can see that client-go provides four kinds of resource lock
- leaselock
- configmaplock
- multilock
- endpointlock
leaselock
Lease is a resource for Leases in the kubernetes control plane implemented through ETCD, mainly to provide a control mechanism for distributed leases. A description of this API can be found at: Lease.
In a Kubernetes cluster, we can use the following command to view the corresponding leases
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
$ kubectl get leases -A
NAMESPACE NAME HOLDER AGE
kube-node-lease master-machine master-machine 3d19h
kube-system kube-controller-manager master-machine_06730140-a503-487d-850b-1fe1619f1fe1 3d19h
kube-system kube-scheduler master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073 3d19h
$ kubectl describe leases kube-controller-manager -n kube-system
Name: kube-controller-manager
Namespace: kube-system
Labels: <none>
Annotations: <none>
API Version: coordination.k8s.io/v1
Kind: Lease
Metadata:
Creation Timestamp: 2022-06-24T11:01:51Z
Managed Fields:
API Version: coordination.k8s.io/v1
Fields Type: FieldsV1
fieldsV1:
f:spec:
f:acquireTime:
f:holderIdentity:
f:leaseDurationSeconds:
f:leaseTransitions:
f:renewTime:
Manager: kube-controller-manager
Operation: Update
Time: 2022-06-24T11:01:51Z
Resource Version: 56012
Self Link: /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager
UID: 851a32d2-25dc-49b6-a3f7-7a76f152f071
Spec:
Acquire Time: 2022-06-27T15:30:46.000000Z
Holder Identity: master-machine_06730140-a503-487d-850b-1fe1619f1fe1
Lease Duration Seconds: 15
Lease Transitions: 2
Renew Time: 2022-06-28T06:09:26.837773Z
Events: <none>
|
Here’s a look at the implementation of leaselock, leaselock will be implemented as an abstraction of the resource lock.
1
2
3
4
5
6
7
8
9
|
type LeaseLock struct {
// LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
LeaseMeta metav1.ObjectMeta
Client coordinationv1client.LeasesGetter // Client 就是提供了informer中的功能
// lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
LockConfig ResourceLockConfig
// lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
lease *coordinationv1.Lease
}
|
Here’s a look at the methods that leaselock implements
Get
Get leaselock.go#L41-L53) is the record that returns the election from the spec.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
}
// 可以看出是返回这个资源spec里面填充的值
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
var r LeaderElectionRecord
if spec.HolderIdentity != nil {
r.HolderIdentity = *spec.HolderIdentity
}
if spec.LeaseDurationSeconds != nil {
r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
}
if spec.LeaseTransitions != nil {
r.LeaderTransitions = int(*spec.LeaseTransitions)
}
if spec.AcquireTime != nil {
r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
}
if spec.RenewTime != nil {
r.RenewTime = metav1.Time{spec.RenewTime.Time}
}
return &r
}
|
Create
Create leaselock.go#L56-L66) is an attempt to create a lease in the kubernetes cluster. As you can see, the Client is the REST client of the corresponding resource provided by the API, and the result will create this Lease in the Kubernetes cluster.
1
2
3
4
5
6
7
8
9
10
11
|
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: ll.LeaseMeta.Name,
Namespace: ll.LeaseMeta.Namespace,
},
Spec: LeaderElectionRecordToLeaseSpec(&ler),
}, metav1.CreateOptions{})
return err
}
|
Update
Update is to update the spec of Lease.
RecordEvent
RecordEvent is a record of the events that occurred during the election, at which point we go back to the previous section. When we look at the ep information in the kubernetes cluster, we can see that there is a became leader
event in the event, and here we are adding the generated event to the meta-data
.
1
2
3
4
5
6
7
8
9
10
11
|
func (ll *LeaseLock) RecordEvent(s string) {
if ll.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}
// Populate the type meta, so we don't have to get it from the schema
subject.Kind = "Lease"
subject.APIVersion = coordinationv1.SchemeGroupVersion.String()
ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events)
}
|
Here we have a general understanding of what a resource lock is. Other types of resource locks are implemented in the same way, so we won’t elaborate too much here; let’s look at the election process below.
election workflow
The code entry for the election is in leaderelection.go, which continues from the above example above.
We saw earlier that the entry point to the election is a RunOrDie() leader-election/main.go#L122) function, so let’s continue from there. Enter RunOrDie and see that there are really only a few lines, and roughly understand that RunOrDie will use the provided configuration to start the election client, and will then block until the ctx exits, or stops holding the leader’s lease.
1
2
3
4
5
6
7
8
9
10
|
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}
|
Here’s a look at NewLeaderElector What does it do? As you can see, LeaderElector is a structure, here it just creates him, and this structure provides everything we need in an election (LeaderElector is the election client created by RunOrDie).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
if lec.LeaseDuration <= lec.RenewDeadline {
return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
}
if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
}
if lec.LeaseDuration < 1 {
return nil, fmt.Errorf("leaseDuration must be greater than zero")
}
if lec.RenewDeadline < 1 {
return nil, fmt.Errorf("renewDeadline must be greater than zero")
}
if lec.RetryPeriod < 1 {
return nil, fmt.Errorf("retryPeriod must be greater than zero")
}
if lec.Callbacks.OnStartedLeading == nil {
return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
}
if lec.Callbacks.OnStoppedLeading == nil {
return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
}
if lec.Lock == nil {
return nil, fmt.Errorf("Lock must not be nil.")
}
le := LeaderElector{
config: lec,
clock: clock.RealClock{},
metrics: globalMetricsFactory.newLeaderMetrics(),
}
le.metrics.leaderOff(le.config.Name)
return &le, nil
}
|
LeaderElector is the established election client.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type LeaderElector struct {
config LeaderElectionConfig // 这个的配置,包含一些时间参数,健康检查
// recoder相关属性
observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time
// used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transition has
// not yet been reported.
reportedLeader string
// clock is wrapper around time to allow for less flaky testing
clock clock.Clock
// 锁定 observedRecord
observedRecordLock sync.Mutex
metrics leaderMetricsAdapter
}
|
You can see that the election logic implemented by Run is the three callbacks that are passed in when initializing the client.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() { // 退出时执行callbacke的OnStoppedLeading
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx) // 选举时,执行 OnStartedLeading
le.renew(ctx)
}
|
The acquire is called in Run, and this is done by a loop to call tryAcquireOrRenew until the end signal is passed by ctx.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
// jitterUntil是执行定时的函数 func() 是定时任务的逻辑
// RetryPeriod是周期间隔
// JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
// sliding 逻辑是否计算在时间内
// 上下文传递
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
|
Here the actual election action in tryAcquireOrRenew, the following look at tryAcquireOrRenew; tryAcquireOrRenew is to try to get a leader lease, if it has been obtained, then update the lease; otherwise can get the lease is true, and vice versa false.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now() // 时间
leaderElectionRecord := rl.LeaderElectionRecord{ // 构建一个选举record
HolderIdentity: le.config.Lock.Identity(), // 选举人的身份特征,ep与主机名有关
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 默认15s
RenewTime: now, // 重新获取时间
AcquireTime: now, // 获得时间
}
// 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
// 看你在runOrDie中初始化传入了什么锁
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
// 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 2. 获取记录检查身份和时间
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() { // 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
if le.IsLeader() { // 到这就说明是leader,修正他的时间
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else { // LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
// 则为发生转变,保持原有值
// 反之,则+1
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
// setObservedRecord 是通过一个新的record来更新这个锁中的record
// 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
le.setObservedRecord(&leaderElectionRecord)
return true
}
|
summary
At this point, you have a complete idea of what the election process is using kubernetes; here is a brief review of all the steps of the above leader election.
- The preferred service is the leader of the service, and the lock can be locked for resources such as
lease
, endpoint
, etc.
- The instance that is already a leader will keep renewing the lease, the default value of the lease is 15 seconds (
leaseDuration
); the leader renews the lease time when the lease is full (renewTime
).
- Other follower, will constantly check the existence of the corresponding resource lock, if there is already a leader, then check
renewTime
, if the lease time () is exceeded, it indicates that there is a problem with the leader need to restart the election until a follower is promoted to the leader.
- And to avoid resource seizure, the Kubernetes API uses
ResourceVersion
to avoid being repeatedly modified (if the version number does not match the requested version number, then it means it has already been modified, then APIServer will return an error)