In kubernetes, the kubernetes API can be accessed from both outside and inside the cluster, directly outside the cluster by accessing the API provided by the apiserver, and inside the cluster by accessing the service as the ClusterIP for kubernetes. kubernetes clusters create a kubernetes service after initialization. A kubernetes service is created and maintained by kube-apiserver, as follows.
1
2
3
4
5
6
7
|
$ kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 4d22h
$ kubectl get endpoints kubernetes
NAME ENDPOINTS AGE
kubernetes 192.168.99.113:6443 4d22h
|
The built-in kubernetes service cannot be removed, and its ClusterIP is the first ip in the ip segment specified with the -service-cluster-ip-range
parameter. ip and port in kubernetes endpoints can be specified with the -advertise- address
and -secure-port
startup parameters.
The kubernetes service is controlled by the bootstrap controller in the kube-apiserver, which has the following main functions.
- Create the kubernetes service.
- Creating the default, kube-system, and kube-public namespaces, and the kube-node-lease namespace if the
NodeLease
feature is enabled.
- Provides Service ClusterIP-based repair and inspection capabilities.
- Provides Service NodePort-based repair and inspection capabilities.
The kubernetes service uses ClusterIP to expose the service by default. To use nodePort, specify the corresponding port at kube-apiserver startup with the -kubernetes-service-node-port
argument.
bootstrap controller source code analysis
kubernetes version: v1.16
The bootstrap controller is initialized and started in the InstallLegacyAPI
method of the CreateKubeAPIServer
call chain. The start and stop of the bootstrap controller is controlled by the apiserver’s PostStartHook
and ShutdownHook
.
k8s.io/kubernetes/pkg/master/master.go:406
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (m *Master) InstallLegacyAPI(......) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("Error building core storage: %v", err)
}
// 初始化 bootstrap-controller
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(......)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("Error in registering group versions: %v", err)
}
return nil
}
|
postStartHooks will call RunPostStartHooks
in the kube-apiserver’s startup method prepared.Run
to start all Hooks.
NewBootstrapController
The bootstrap controller requires several parameters to be set during initialization, mainly PublicIP, ServiceCIDR, PublicServicePort, etc. PublicIP is specified by the command line parameter -advertise-address
, if not specified, the system will automatically select a global IP. PublicServicePort is specified by the -secure-port
startup parameter (default is 6443), and ServiceCIDR is specified by the -service-cluster-ip-range
parameter (default is 10.0.0.0/24).
k8s.io/kubernetes/pkg/master/controller.go:89
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (c *completedConfig) NewBootstrapController(......) *Controller {
// 1、获取 PublicServicePort
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
klog.Fatalf("failed to get listener address: %v", err)
}
// 2、指定需要创建的 kube-system 和 kube-public
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
}
return &Controller{
......
// ServiceClusterIPRegistry 是在 CreateKubeAPIServer 初始化 RESTStorage 时初始化的,是一个 etcd 实例
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
// SecondaryServiceClusterIPRange 需要在启用 IPv6DualStack 后才能使用
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,
// API Server 绑定的IP,这个IP会作为kubernetes service的Endpoint的IP
PublicIP: c.GenericConfig.PublicAddress,
// 取 clusterIP range 中的第一个 IP
ServiceIP: c.ExtraConfig.APIServerServiceIP,
// 默认为 6443
ServicePort: c.ExtraConfig.APIServerServicePort,
ExtraServicePorts: c.ExtraConfig.ExtraServicePorts,
ExtraEndpointPorts: c.ExtraConfig.ExtraEndpointPorts,
// 这里为 6443
PublicServicePort: publicServicePort,
// 缺省是基于 ClusterIP 启动模式,这里为0
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}
}
|
The code to automatically select the global IP is shown below.
k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go:323
1
2
3
4
5
6
7
8
9
10
11
|
func ChooseHostInterface() (net.IP, error) {
var nw networkInterfacer = networkInterface{}
if _, err := os.Stat(ipv4RouteFile); os.IsNotExist(err) {
return chooseIPFromHostInterfaces(nw)
}
routes, err := getAllDefaultRoutes()
if err != nil {
return nil, err
}
return chooseHostInterfaceFromRoute(routes, nw)
}
|
bootstrapController.Start
The four main functions of the bootstrap controller have been mentioned above: repairing the ClusterIP, repairing the NodePort, updating the kubernetes service, and creating the namespaces (default, kube-system, kube-public) needed by the system. The bootstrap controller will first complete the ClusterIP, NodePort, and Kubernets services once after startup, and then run the above four jobs in an asynchronous loop. The following is its start
method.
k8s.io/kubernetes/pkg/master/controller.go:146
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (c *Controller) Start() {
if c.runner != nil {
return
}
// 1、首次启动时首先从 kubernetes endpoints 中移除自身的配置,
// 此时 kube-apiserver 可能处于非 ready 状态
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
// 2、初始化 repairClusterIPs 和 repairNodePorts 对象
repairClusterIPs := servicecontroller.NewRepair(......)
repairNodePorts := portallocatorcontroller.NewRepair(......)
// 3、首先运行一次 epairClusterIPs 和 repairNodePorts,即进行初始化
if err := repairClusterIPs.RunOnce(); err != nil {
klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
}
if err := repairNodePorts.RunOnce(); err != nil {
klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
}
// 4、定期执行 bootstrap controller 主要的四个功能
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
}
|
c.RunKubernetesNamespaces
The main function of c.RunKubernetesNamespaces
is to create the kube-system and kube-public namespaces, and if the NodeLease
feature is enabled, the kube-node-lease-namespace, which is checked every minute afterwards.
k8s.io/kubernetes/pkg/master/controller.go:199
1
2
3
4
5
6
7
8
9
|
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
}, c.SystemNamespacesInterval, ch)
}
|
c.RunKubernetesService
The main purpose of c.RunKubernetesService
is to check if the kubernetes service is in a normal state and perform regular synchronization operations. First call the /healthz
interface to check if the apiserver is currently in the ready state, if so then call the c.UpdateKubernetesService
service to update the kubernetes service state.
k8s.io/kubernetes/pkg/master/controller.go:210
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (c *Controller) RunKubernetesService(ch chan struct{}) {
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
wait.NonSlidingUntil(func() {
if err := c.UpdateKubernetesService(false); err != nil {
runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
}
}, c.EndpointInterval, ch)
}
|
c.UpdateKubernetesService
The main logic of c.UpdateKubernetesService
is as follows.
- Call
createNamespaceIfNeeded
to create default namespace.
- Call
c.CreateOrUpdateMasterServiceIfNeeded
to create a kubernetes service for master.
- Call
c.EndpointReconciler.ReconcileEndpoints
to update the master’s endpoint.
k8s.io/kubernetes/pkg/master/controller.go:230
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
return err
}
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
return err
}
return nil
}
|
c.EndpointReconciler.ReconcileEndpoints
The specific implementation of EndpointReconciler is determined by EndpointReconcilerType
, EndpointReconcilerType
is specified by -endpoint-reconciler-type
parameter, the optional parameters are master-count, lease, none
, each type corresponds to a different EndpointReconciler instance, in v1.16 the default is lease, here we only analyze the implementation of EndpointReconciler corresponding to lease.
There may be multiple apiserver instances in a cluster, so the endpoints of the apiserver service need to be managed uniformly. c.EndpointReconciler.ReconcileEndpoints
is used to manage the apiserver endpoints. All instances of apiserver in a cluster will create a key in the corresponding directory in etcd and update this key periodically to report their heartbeat information, ReconcileEndpoints will get the apiserver instance information from etcd and update the endpoint.
k8s.io/kubernetes/pkg/master/reconcilers/lease.go:144
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
func (r *leaseEndpointReconciler) ReconcileEndpoints(......) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
// 更新 lease 信息
if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(......) error {
// 1、获取 master 的 endpoint
e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
shouldCreate := false
if err != nil {
if !errors.IsNotFound(err) {
return err
}
shouldCreate = true
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: corev1.NamespaceDefault,
},
}
}
// 2、从 etcd 中获取所有的 master
masterIPs, err := r.masterLeases.ListLeases()
if err != nil {
return err
}
if len(masterIPs) == 0 {
return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
}
// 3、检查 endpoint 中 master 信息,如果与 etcd 中的不一致则进行更新
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
if formatCorrect && ipCorrect && portsCorrect {
return nil
}
if !formatCorrect {
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{},
Ports: endpointPorts,
}}
}
if !formatCorrect || !ipCorrect {
e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
for ind, ip := range masterIPs {
e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
}
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
}
if !portsCorrect {
e.Subsets[0].Ports = endpointPorts
}
if shouldCreate {
if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
err = nil
}
} else {
_, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
}
return err
}
|
repairClusterIPs.RunUntil
The main problems that repairClusterIP solves are.
- Ensuring that all ClusterIPs in a cluster are uniquely assigned.
- Ensure that the assigned ClusterIPs do not exceed the specified range.
- Ensuring that ClusterIPs are assigned to services but are not created correctly due to crashes and other reasons.
- automatically migrating older versions of Kubernetes services to the ipallocator atomicity model.
repairClusterIPs.RunUntil
actually calls repairClusterIPs.runOnce
to handle it, and the main logic in its code is shown below.
k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go:134
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
func (c *Repair) runOnce() error {
......
// 1、首先从 etcd 中获取已经使用 ClusterIP 的快照
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
if err != nil {
return false, err
}
if c.shouldWorkOnSecondary() {
secondarySnapshot, err = c.secondaryAlloc.Get()
if err != nil {
return false, err
}
}
return true, nil
})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
// 2、判断 snapshot 是否已经初始化
if snapshot.Range == "" {
snapshot.Range = c.network.String()
}
if c.shouldWorkOnSecondary() && secondarySnapshot.Range == "" {
secondarySnapshot.Range = c.secondaryNetwork.String()
}
stored, err = ipallocator.NewFromSnapshot(snapshot)
if c.shouldWorkOnSecondary() {
secondaryStored, secondaryErr = ipallocator.NewFromSnapshot(secondarySnapshot)
}
if err != nil || secondaryErr != nil {
return fmt.Errorf("unable to rebuild allocator from snapshots: %v", err)
}
// 3、获取 service list
list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
// 4、将 CIDR 转换为对应的 IP range 格式
var rebuilt, secondaryRebuilt *ipallocator.Range
rebuilt, err = ipallocator.NewCIDRRange(c.network)
......
// 5、检查每个 Service 的 ClusterIP,保证其处于正常状态
for _, svc := range list.Items {
if !helper.IsServiceIPSet(&svc) {
continue
}
ip := net.ParseIP(svc.Spec.ClusterIP)
......
actualAlloc := c.selectAllocForIP(ip, rebuilt, secondaryRebuilt)
switch err := actualAlloc.Allocate(ip); err {
// 6、检查 ip 是否泄漏
case nil:
actualStored := c.selectAllocForIP(ip, stored, secondaryStored)
if actualStored.Has(ip) {
actualStored.Release(ip)
} else {
......
}
delete(c.leaks, ip.String())
// 7、ip 重复分配
case ipallocator.ErrAllocated:
......
// 8、ip 超出范围
case err.(*ipallocator.ErrNotInRange):
......
// 9、ip 已经分配完
case ipallocator.ErrFull:
......
default:
......
}
}
// 10、对比是否有泄漏 ip
c.checkLeaked(stored, rebuilt)
if c.shouldWorkOnSecondary() {
c.checkLeaked(secondaryStored, secondaryRebuilt)
}
// 11、更新快照
err = c.saveSnapShot(rebuilt, c.alloc, snapshot)
if err != nil {
return err
}
if c.shouldWorkOnSecondary() {
err := c.saveSnapShot(secondaryRebuilt, c.secondaryAlloc, secondarySnapshot)
if err != nil {
return nil
}
}
return nil
}
|
repairNodePorts.RunUnti
repairNodePorts is mainly used to correct the nodePort information in the service and ensure that all ports are created based on the cluster. The alarm is triggered when the ports are not synchronized with the cluster, which is eventually handled by calling repairNodePorts.runOnce
, the main logic is similar to that of ClusterIP.
k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller/repair.go:84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
func (c *Repair) runOnce() error {
// 1、首先从 etcd 中获取已使用 nodeport 的快照
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
return err == nil, err
})
if err != nil {
return fmt.Errorf("unable to refresh the port allocations: %v", err)
}
// 2、检查 snapshot 是否初始化
if snapshot.Range == "" {
snapshot.Range = c.portRange.String()
}
// 3、获取已分配 nodePort 信息
stored, err := portallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
}
// 4、获取 service list
list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
rebuilt, err := portallocator.NewPortAllocator(c.portRange)
if err != nil {
return fmt.Errorf("unable to create port allocator: %v", err)
}
// 5、检查每个 Service ClusterIP 的 port,保证其处于正常状态
for i := range list.Items {
svc := &list.Items[i]
ports := collectServiceNodePorts(svc)
if len(ports) == 0 {
continue
}
for _, port := range ports {
switch err := rebuilt.Allocate(port); err {
// 6、检查 port 是否泄漏
case nil:
if stored.Has(port) {
stored.Release(port)
} else {
......
}
delete(c.leaks, port)
// 7、port 重复分配
case portallocator.ErrAllocated:
......
// 8、port 超出分配范围
case err.(*portallocator.ErrNotInRange):
......
// 9、port 已经分配完
case portallocator.ErrFull:
......
default:
......
}
}
}
// 10、检查 port 是否泄漏
stored.ForEach(func(port int) {
count, found := c.leaks[port]
switch {
case !found:
......
count = numRepairsBeforeLeakCleanup - 1
fallthrough
case count > 0:
c.leaks[port] = count - 1
if err := rebuilt.Allocate(port); err != nil {
runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
}
default:
......
}
})
// 11、更新 snapshot
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
}
......
return nil
}
|
The above is the main implementation of bootstrap controller.
Summary
This article analyzes the implementation of apiserver service in kube-apiserver. apiserver service is controlled by bootstrap controller. bootstrap controller will ensure that apiserver service and its The bootstrap controller ensures that the apiserver service and its endpoint are in a normal state. Note that the endpoint of the apiserver service is divided into three control methods according to the parameters specified at startup, and this article only analyzes the implementation of lease. If you use the master-count method, you need to change the port, apiserver-count and other configuration parameters to the same for each master instance.