Background
I’ve written a previous article [“Kubernetes in Action - Smooth Node Removal”]) about how to remove nodes from a K8s cluster, and today I’ll take a look at what the kubectl drain
command does and how it does it.
kubectl
K8s uses cobra as a command line builder (I don’t find cobra very useful and the documentation is unclear.) The actual processing logic is in pkg/kubectl/cmd/cmd.go
, with a unified entry point in cmd/kubectl/kubectl.go
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
...
groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
...
},
{
Message: "Deploy Commands:",
...
},
{
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
certificates.NewCmdCertificate(f, ioStreams),
clusterinfo.NewCmdClusterInfo(f, ioStreams),
top.NewCmdTop(f, ioStreams),
drain.NewCmdCordon(f, ioStreams),
drain.NewCmdUncordon(f, ioStreams),
drain.NewCmdDrain(f, ioStreams),
taint.NewCmdTaint(f, ioStreams),
},
},
...
}
groups.Add(cmds)
|
As you can see in the kubectl
entry point for all the subcommands, the drain
commands we’re looking at today are all cluster management commands and contain.
cordon
Let’s start with the cordon
command, the purpose of this command is to mark a node as non-schedulable to prevent K8s from scheduling resources to that node while node maintenance is being performed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
o := NewDrainCmdOptions(f, ioStreams)
cmd := &cobra.Command{
Use: "cordon NODE",
DisableFlagsInUseLine: true,
Short: i18n.T("Mark node as unschedulable"),
Long: cordonLong,
Example: cordonExample,
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.RunCordonOrUncordon(true))
},
}
cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
cmdutil.AddDryRunFlag(cmd)
return cmd
}
|
Looking directly at the contents of Run
, ignoring cmdutil.CheckErr
for the moment, two main methods are executed here: o.Complete
and o.RunCordonOrUncordon
. The fundamental purpose of kubectl
is to send the corresponding HTTP request to the APIServer, and kubectl
implements a layer of encapsulation through Builder
and Visitor
, making the implementation of each subcommand uniform and concise. consistent and concise.
1
2
3
4
5
6
7
8
9
10
|
// Builder provides convenience functions for taking arguments and parameters
// from the command line and converting them to a list of resources to iterate
// over using the Visitor interface.
type Builder struct {
...
}
// Visitor lets clients walk a list of resources.
type Visitor interface {
Visit(VisitorFunc) error
}
|
The corresponding builder
is constructed in o.Complete
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
...
// 根据命令行参数构建 builder 实例
builder := f.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(o.Namespace).DefaultNamespace().
ResourceNames("nodes", args...).
SingleResourceType().
Flatten()
if len(o.drainer.Selector) > 0 {
builder = builder.LabelSelectorParam(o.drainer.Selector).
ResourceTypes("nodes")
}
// builder.Do 返回带有 Visitor 的 Result 对象
r := builder.Do()
|
See what builder.Do()
does next to return the Result type resource.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (b *Builder) Do() *Result {
// 调用 visitorResult 返回 Result 类型
r := b.visitorResult()
...
return r
}
...
func (b *Builder) visitorResult() *Result {
...
// 跳过其他步骤,直接看最简单的通过 Name 来获取 Result
if len(b.names) != 0 {
return b.visitByName()
}
...
}
...
func (b *Builder) visitByName() *Result {
// 声明 Result 对象
result := &Result{
singleItemImplied: len(b.names) == 1,
targetsSingleItems: true,
}
...
// 获取 K8s client
client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
...
visitors := []Visitor{}
for _, name := range b.names {
info := &Info{
Client: client,
Mapping: mapping,
Namespace: selectorNamespace,
Name: name,
Export: b.export,
}
visitors = append(visitors, info)
}
// VisitorList 也实现了 Visit 接口,遍历执行 Visitor 的 Visit 方法
result.visitor = VisitorList(visitors)
result.sources = visitors
return result
}
|
Having seen how to get an object of type Result, let’s look at how o.Complete
handles it, passing in a VisitorFunc, the visitors of Result all implement the Visit
interface, the role of the Visit
interface is to receive the VisitorFunc
and execute it. The role of the Visit
interface is to receive the VisitorFunc
and execute it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
return r.Visit(func(info *resource.Info, err error) error {
...
})
...
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
...
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}
|
Next, see what o.RunCordonOrUncordon
does.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
cordonOrUncordon := "cordon"
if !desired {
cordonOrUncordon = "un" + cordonOrUncordon
}
// 通过 Visit 获取到的 nodeInfos 列表
for _, nodeInfo := range o.nodeInfos {
...
gvk := nodeInfo.ResourceMapping().GroupVersionKind
if gvk.Kind == "Node" {
c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk)
if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
...
} else {
if o.drainer.DryRunStrategy != cmdutil.DryRunClient {
...
// 修改对应节点的配置
err, patchErr := c.PatchOrReplace(o.drainer.Client, o.drainer.DryRunStrategy == cmdutil.DryRunServer)
...
}
}
}
...
}
return nil
}
...
func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) {
client := clientset.CoreV1().Nodes()
oldData, err := json.Marshal(c.node)
// 更新 node Spec 的 Unschedulable 字段
c.node.Spec.Unschedulable = c.desired
newData, err := json.Marshal(c.node)
// merge 数据,通过 diff 然后获取
patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node)
if patchErr == nil {
...
_, err = client.Patch(context.TODO(), c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions)
}
...
}
|
Drain
After Cordon, on to Drain.
1
2
3
4
5
6
7
8
9
10
|
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
...
cmd := &cobra.Command{
...
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.RunDrain())
},
}
...
|
Looking directly at o.RunDrain
, the first thing we see is the execution of o.RunCordonOrUncordon
, which marks the node as undispatchable, so the blog I wrote earlier is actually incorrect, if you want to take the node offline, then just execute kubectl drain
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (o *DrainCmdOptions) RunDrain() error {
if err := o.RunCordonOrUncordon(true); err != nil {
return err
}
...
drainedNodes := sets.NewString()
var fatal error
for _, info := range o.nodeInfos {
// 驱逐 Pod
if err := o.deleteOrEvictPodsSimple(info); err == nil {
drainedNodes.Insert(info.Name)
printObj(info.Object, o.Out)
} else {
// 如果驱逐 Pod 失败,则显示对应的 Node 信息
if len(remainingNodes) > 0 {
fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
for _, nodeName := range remainingNodes {
fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
}
}
break
}
}
}
|
In deleteOrEvictPodsSimple
, the corresponding Pod information is first obtained by Node name, and then the eviction action is performed.
1
2
3
4
5
6
7
|
func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
...
if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
...
}
}
|
Here GetPodsForDeletion
performs a filter that contains filters for the following scenarios, it should be noted that the filtering scenarios here are in a strict order.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (d *Helper) makeFilters() []podFilter {
return []podFilter{
// 被标记删除的 Pod(DeletionTimestamp 不为0)
d.skipDeletedFilter,
// 属于 DaemonSet 的 Pod
d.daemonSetFilter,
// mirror pod 其实就是 static pod,
// 是我们在 /etc/kubernetes/manifests/ 中定义的由 kubelet 负责生命周期管理的 Pod
// 在 `Annotations` 中会包含 `kubernetes.io/config.mirror`
d.mirrorPodFilter,
// 包含本地存储的 Pod,Pod 中的 Volume 字段不为空
d.localStorageFilter,
// 不属于 replicate 的 pod,`Controlled By` 不为空的 pod
d.unreplicatedFilter,
}
}
|
Once the filtered list of Pods is obtained, the eviction action is performed, starting with a goroutine for each Pod, and waiting until the Pod eviction is complete.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
...
ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
defer cancel()
for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error) {
for {
...
select {
case <-ctx.Done():
// 驱逐超时
returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout)
return
default:
}
// 驱逐 Pod 动作,最终执行 d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
err := d.EvictPod(pod, policyGroupVersion)
...
}
...
params := waitForDeleteParams{
...
}
// 等待驱逐动作完成
_, err := waitForDelete(params)
if err == nil {
returnCh <- nil
} else {
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, returnCh)
}
|
waitForDelete
will pass ConditionFunc
into the WaitFor
loop if it does not complete immediately, where ConditionFunc
detects that a Pod exists and the ObjectMeta UID has changed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
stopCh := make(chan struct{})
defer close(stopCh)
c := wait(stopCh)
for {
select {
case _, open := <-c:
ok, err := runConditionWithCrashProtection(fn)
if err != nil {
return err
}
if ok {
return nil
}
if !open {
return ErrWaitTimeout
}
case <-done:
return ErrWaitTimeout
}
}
}
|
Summary
The implementation of the kubectl drain
command is very simple, with no particularly complex logic. An important reason why K8s is able to do this is that all actions are declarative, and there is no need to actively do something dirty after declaring and waiting for execution to complete. In the case of Pod eviction, not all Pods will be evicted to other nodes, so extra care needs to be taken to check if there are simply Pod resources still running on the node before it goes offline, or if there are any Pods using local storage, or similar.
I’ve seen this combination of design patterns for a while now, and I’m looking for a chance to re-learn them.