kube-apiserver is a component in kubernetes that interacts directly with etcd and controls changes to core resources in kubernetes. It provides the following main functions.
- Providing Kubernetes API, including authentication authorization, data validation, and cluster state changes, for clients and other components to call.
- Proxy some additional component components in the cluster, such as Kubernetes UI, metrics-server, npd, etc.
- Creating kubernetes services, i.e., Service that provides apiserver, kubernetes Service.
- conversion of resources between different versions.
kube-apiserver processing flow
kube-apiserver interacts with other components mainly by providing APIs to the outside world, which can be obtained by calling kube-apiserver’s interface $ curl -k https://<masterIP>:6443
or through its provided swagger-ui, which has the following three main APIs.
- core groups: mainly under
/api/v1
.
- named groups: whose path is
/apis/$NAME/$VERSION
.
- Some APIs that expose system state: such as
/metrics
, /healthz
, etc.
The URL of the API consists roughly of /apis/group/version/namespaces/my-ns/myresource
, where the structure of the API is roughly as shown below.
After understanding the API of kube-apiserver, the following section describes how kube-apiserver handles an API request. The complete flow of a request is shown in the following diagram.
An example of a POST request is shown here. When the request arrives at kube-apiserver, kube-apiserver first executes the filter chain registered in the http filter chain, which performs a series of filtering operations, mainly authentication, authentication and other checking operations. When the filter chain is finished, the request is routed to the corresponding handler, which mainly interacts with etcd. The main operations in the handler are as follows
Decoder
Most resources in kubernetes will have an internal version
, because a resource may have multiple versions throughout the development process. e.g. a deployment will have extensions/v1beta1
, apps/v1
. To avoid problems, kube-apiserver has to know how to convert between each pair of versions (e.g., v1⇔v1alpha1, v1⇔v1beta1, v1beta1⇔v1alpha1). Therefore it uses a special internal version
, which is a generic version that contains all the fields of the version and has the functionality of all versions. Decoder will first convert the creater object to internal version
and then convert it to storage version
, which is another version when stored in etcd.
When decoding, it first gets the expected version from the HTTP path, then uses scheme to create a matching empty object with the correct version, and uses a JSON or protobuf decoder to convert it, and in the first step of the conversion, if the user omits some fields, Decoder sets them to default values.
Admission
After the decoding is done, you need to check if the objects can be created or updated by verifying the global constraints of the cluster and setting the default values according to the cluster configuration. You can see all the global constraint plugins that kube-apiserver can use in the k8s.io/kubernetes/plugin/pkg/admission
directory. kube-apiserver is started by setting the -enable-admission-plugins
parameter to The plugins added via -ValidatingAdmissionWebhook
or -MutatingAdmissionWebhook
will also work here.
Validation
Mainly check the legality of the fields in the object.
After executing the above operations in the handler, the last operation related to etcd will be executed, and the POST operation will write the data to etcd. The main processing flow of the above in the handler is shown below.
1
2
|
v1beta1 ⇒ internal ⇒ | ⇒ | ⇒ v1 ⇒ json/yaml ⇒ etcd
admission validation
|
Components in kube-apiserver
The kube-apiserver consists of 3 components (Aggregator, KubeAPIServer, APIExtensionServer), which handle requests in turn through Delegation.
- Aggregator: exposes functions similar to a seven-tier load balancing, intercepting requests from users for forwarding to other servers, and is responsible for the Discovery function of the entire APIServer.
- KubeAPIServer: responsible for some general processing of requests, authentication, authentication, etc., as well as handling the REST services for each built-in resource.
- APIExtensionServer: mainly handles CustomResourceDefinition (CRD) and CustomResource (CR) REST requests, and is the last link of Delegation, returning 404 if the corresponding CR cannot be processed.
Aggregator and APIExtensionsServer correspond to the two main ways of extending APIServer resources, i.e. AA and CRD respectively.
Aggregator
Aggregator forwards requests by associating APIServices objects to a Service, and the type of Service it is associated with further determines the form of request forwarding. Aggregator consists of a GenericAPIServer
and a Controller that maintains its own state. The GenericAPIServer
mainly handles requests for APIService resources under the apiregistration.k8s.io
group.
Aggregator contains several controllers in addition to handling resource requests:
apiserviceRegistrationController
: responsible for the registration and deletion of resources in APIServices.
availableConditionController
: maintains the availability status of APIServices, including whether its referenced Service is available, etc.
autoRegistrationController
: used to maintain a specific set of APIServices that exist in the API.
crdRegistrationController
: responsible for the automatic registration of CRD GroupVersions into APIServices.
openAPIAggregationController
: synchronizes changes to APIServices resources to the provided OpenAPI documentation.
Some additional components in kubernetes, such as metrics-server are extended by way of Aggregator, which can be used in a real environment by using apiserver-builder tool to easily create custom resources as Aggregator extensions.
Enabling API Aggregation
The following configuration needs to be added to kube-apiserver to enable API Aggregation.
1
2
3
4
5
6
7
|
--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt
--proxy-client-key-file=/etc/kubernetes/certs/proxy.key
--requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt
--requestheader-allowed-names=aggregator
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User
|
KubeAPIServer
KubeAPIServer mainly provides requests for API Resource operations, registers routing information for many APIs in kubernetes, exposes RESTful APIs and provides kubernetes services to the public.
It enables services in and outside the cluster to manipulate resources in kubernetes through RESTful APIs.
APIExtensionServer
The APIExtensionServer is the final layer of the Delegation chain and is the resource server that handles all the resources defined by users through the Custom Resource Definition.
The controllers included and their functions are shown below.
openapiController
: synchronizes changes to crd resources to the provided OpenAPI documentation, which can be viewed by visiting /openapi/v2
.
crdController
: responsible for registering crd information into apiVersions and apiResources, both of which can be viewed via $ kubectl api-versions
and $ kubectl api-resources
.
namingController
: check for naming conflicts in crd obj, viewable in crd .status.conditions
.
establishingController
: checks if crd is in normal status, available in crd .status.conditions
.
nonStructuralSchemaController
: checks if the crd obj structure is normal, available in crd .status.conditions
.
apiApprovalController
: checks if the crd follows the kubernetes API declaration policy, available in crd .status.conditions
.
finalizingController
: a function similar to finalizes, related to the removal of CRs.
kube-apiserver boot process analysis
kubernetes version: v1.16
First, we analyze how kube-apiserver is started. kube-apiserver also starts the main logic through its Run
method, and before the Run
method is called, it parses command line parameters, sets default values, etc.
Run
The main logic of the Run
method is.
- Call
CreateServerChain
to build the service call chain and determine whether to start the non-secure http server. The http server chain contains the three servers to be started by apiserver, and the routes to register the corresponding resources for each server.
- Call
server.PrepareRun
to prepare the service for running, which mainly completes the health check, survival check and registration of the OpenAPI
route.
- Call
prepared.Run
to start the https server.
The server is initialized using the Delegation pattern, and the basic API Server, CustomResource, and Aggregator services are chained together through the DelegationTarget interface to provide services to the outside world.
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:147
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
|
CreateServerChain
CreateServerChain
is the method to complete the server initialization, which contains all the processes of APIExtensionsServer
, KubeAPIServer
, AggregatorServer
initialization, and finally returns aggregatorapiserver. APIAggregator
instance, the initialization process mainly includes: the configuration of http filter chain, the registration of API Group, the association of http path with handler and the configuration of handler backend storage etcd. The main logic is as follows.
- Create the configuration required by KubeAPIServer by calling
CreateKubeAPIServerConfig
, mainly creating master.Config
, which will call buildGenericConfig
to generate genericConfig, which contains the core configuration of the core configuration of apiserver.
- determine if the extended API server is enabled and call
createAPIExtensionsConfig
to create a configuration for it. apiExtensions server is a proxy service for other servers in the kubeapiserver, such as the metric-server.
- Call
createAPIExtensionsServer
to create an instance of apiExtensionsServer.
- Call
CreateKubeAPIServer
to initialize the kubeAPIServer.
- Call
createAggregatorConfig
to create a configuration for the aggregatorServer and call createAggregatorServer
to initialize the aggregatorServer.
- configure and determine whether to start a non-secure http server.
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:165
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
// 1、为 kubeAPIServer 创建配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
// 3、初始化 APIExtensionsServer
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
// 4、初始化 KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
// 5、创建 AggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig. ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6、初始化 AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
return nil, err
}
// 7、判断是否启动非安全端口的 http server
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
return aggregatorServer, nil
}
|
CreateKubeAPIServerConfig
The main purpose of CreateKubeAPIServerConfig
is to call buildGenericConfig
to create genericConfig and build the master.Config object.
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:271
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
func CreateKubeAPIServerConfig(
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
) (......) {
// 1、构建 genericConfig
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
if lastErr != nil {
return
}
......
// 2、初始化所支持的 capabilities
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: s.AllowPrivileged,
PrivilegedSources: capabilities.PrivilegedSources{
HostNetworkSources: []string{},
HostPIDSources: []string{},
HostIPCSources: []string{},
},
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})
// 3、获取 service ip range 以及 api server service IP
serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange)
if lastErr != nil {
return
}
......
// 4、构建 master.Config 对象
config = &master.Config{......}
if nodeTunneler != nil {
config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
}
if config.GenericConfig.EgressSelector != nil {
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
}
return
}
|
buildGenericConfig
The main logic is as follows.
- Call
genericapiserver.NewConfig
to generate the default genericConfig, genericConfig mainly configures DefaultBuildHandlerChain
, DefaultBuildHandlerChain
contains a series of http filter chains for authentication, authentication, authentication, and a series of http filter chains.
- calls
master.DefaultAPIResourceConfigSource
to load the API Resource that needs to be enabled, all API Resources in the cluster can be seen in the k8s.io/api
directory of the code, which will also keep changing as the version iterates.
- Setting default values for some of the fields in genericConfig.
- Call
completedStorageFactoryConfig.New
to create the storageFactory, which will be used later to create the corresponding RESTStorage for each API Resource.
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:386
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (......) {
// 1、为 genericConfig 设置默认值
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
......
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(......)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
// 初始化 storageFactory
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
// 2、初始化 RESTOptionsGetter,后期根据其获取操作 Etcd 的句柄,同时添加 etcd 的健康检查方法
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// 3、设置使用 protobufs 用来内部交互,并且禁用压缩功能
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
genericConfig.LoopbackClientConfig.DisableCompression = true
// 4、创建 clientset
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// 5、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、
// ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等
genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authentication config: %v", err)
return
}
// 6、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
......
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)
// 7、审计插件的初始化
lastErr = s.Audit.ApplyTo(......)
if lastErr != nil {
return
}
// 8、准入插件的初始化
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
err = s.Admission.ApplyTo(......)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
}
return
}
|
The above analysis focuses on the initialization of KubeAPIServerConfig. The initialization of the other two server configs will not be analyzed in detail for now, so we will continue to analyze the initialization of the server.
createAPIExtensionsServer
APIExtensionsServer is the first to be initialized, and the server is initialized by calling apiextensionsConfig.Complete().New
in createAPIExtensionsServer
with the following logic.
- First call
c.GenericConfig.New
to initialize the Container according to the go-restful
pattern, and in c.GenericConfig.New
call NewAPIServerHandler
to initialize the handler, APIServerHandler Handler type used by the API Server, including go-restful
and non-go-restful
, and a Director object of your choice between the two, with go-restful
used to handle registered handlers and non-go restful
is used to handle non-existent handlers, and the selection process for API URI handling is: FullHandlerChain-> Director -> {GoRestfulContainer, NonGoRestfulMux}
. In c.GenericConfig.New
, installAPI
is also called to add routing information including /
, /debug/*
, /metrics
, /version
, etc. All three types of servers are initialized by calling c.GenericConfig.New
to initialize a genericServer and then register the API.
- Call
s.GenericAPIServer.InstallAPIGroup
to register the API Resources in the route, the call chain of this method is very deep, mainly to register the API Resource to be exposed to the server, so that the REST operation of the resource can be performed through the http interface. Several other servers also perform the corresponding InstallAPI
during initialization.
- The controllers that need to be used in the initialization server, mainly
openapiController
, crdController
, namingController
, establishingController
, nonStructuralSchemaController
, apiApprovalController
, finalizingControlle
r.
- Add the controllers and informer that need to be started to the PostStartHook.
k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go:94
1
2
3
|
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (* apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
|
k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:132
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// 1、初始化 genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
if err != nil {
return nil, err
}
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
// 2、初始化 APIGroup Info,APIGroup 指该 server 需要暴露的 API
apiResourceConfig := c.GenericConfig.MergedResourceConfig
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefintionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
......
}
// 3、注册 APIGroup
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 4、初始化需要使用的 controller
crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
......
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions())
crdHandler, err := NewCustomResourceDefinitionHandler(......)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions(). InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
crdClient.Apiextensions(),
crdHandler,
)
var openapiController *openapicontroller.Controller
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
}
// 5、将 informer 以及 controller 添加到 PostStartHook 中
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
......
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
}, context.StopCh)
})
return s, nil
}
|
The above is the initialization process of APIExtensionsServer, where the most core method is s.GenericAPIServer.InstallAPIGroup, which is the registration process of APIs, and the registration process of APIs in all three servers is its core.
CreateKubeAPIServer
This section continues the analysis of the initialization of the KubeAPIServer, where kubeAPIServerConfig.Complete().New
is called in CreateKubeAPIServer
to complete the initialization operations.
kubeAPIServerConfig.Complete().New
The main logic is as follows.
- Initialize GenericAPIServer with a call to
c.GenericConfig.New
, whose main implementation was analyzed above.
- Determine whether logs-related routes are supported, and if so, add the
/logs
route.
- Call
m.InstallLegacyAPI
to add the core API Resource to the route, which corresponds to the apiserver as a resource starting with /api
.
- call
m.InstallAPIs
to add the extended API Resource to the route, which in apiserver is the resource starting with /apis
.
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:214
1
2
3
4
5
6
7
8
9
10
|
func CreateKubeAPIServer(......) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
}
|
k8s.io/kubernetes/pkg/master/master.go:325
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
......
// 1、初始化 GenericAPIServer
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// 2、注册 logs 相关的路由
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// 3、安装 LegacyAPI
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
......
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig. Authentication.APIAudiences},
......
}
// 4、安装 APIs
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
|
m.InstallLegacyAPI
The main function of this method is to register the core API to the route, which is one of the most core methods in the apiserver initialization process, but its call chain is very deep, and will be analyzed in depth below. The ultimate purpose of registering the API to the route is to provide an external RESTful API to operate the corresponding resource, the registration API is mainly divided into two steps, the first step is to initialize RESTStorage for each resource in the API to operate the change of data in the back-end storage, the second step is to build the corresponding route for each resource according to its verbs The second step is to build the corresponding route for each resource based on its verbs. The main logic of m.InstallLegacyAPI
is as follows.
-
Call legacyRESTStorageProvider.NewLegacyRESTStorage
to create RESTStorage for each resource in LegacyAPI. The purpose of RESTStorage is to correspond the access paths of each resource and its operations in the backend storage.
-
Initialize bootstrap-controller
and add it to PostStartHook, bootstrap-controller
is a controller in apiserver, the main function is to create some namespace needed by the system and to create kubernetes service and periodically trigger the corresponding sync operation. apiserver will start bootstrap-controller
by calling PostStartHook after it starts.
-
After creating RESTStorage for the resource, call m.GenericAPIServer.InstallLegacyAPIGroup
to register the routing information for the APIGroup. The call chain for the InstallLegacyAPIGroup
method is very deep, mainly InstallLegacyAPIGroup --> installAPIResources --> InstallREST --> Install --> registerResourceHandlers
, and the final core route construction is in the registerResourceHandlers
method, which is more complex, its main function is to determine which operations (such as create, update, etc.) can be performed by the resource through the REST Storage constructed in the previous step, and store the corresponding operations into the action, each action corresponds to a standard Finally, according to the actions array, each action will add a handler method and register to the route, and then the route will be registered to the webservice. The webservice will eventually be registered to the container, following the go-restful design pattern.
The details of the legacyRESTStorageProvider.NewLegacyRESTStorage
and m.GenericAPIServer.InstallLegacyAPIGroup
methods will be continued in a later section.
k8s.io/kubernetes/pkg/master/master.go:406
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (m *Master) InstallLegacyAPI(......) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("Error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("Error in registering group versions: %v", err)
}
return nil
}
|
The main processes of InstallAPIs
and InstallLegacyAPI
are similar, so we won’t go into them here for the sake of space.
createAggregatorServer
AggregatorServer
is mainly used for custom aggregation controllers to enable CRDs to be automatically registered to the cluster.
The main logic is as follows.
- Call
aggregatorConfig.Complete().NewWithDelegate
to create the aggregatorServer.
- Initialize
crdRegistrationController
and autoRegistrationController
. crdRegistrationController
is responsible for registering CRDs and autoRegistrationController
is responsible for automatically registering the CRD The crdRegistrationController
is responsible for registering the CRD, and the autoRegistrationController
is responsible for automatically registering the corresponding APIServices to the apiserver.
- Add
autoRegistrationController
and crdRegistrationController
to the PostStartHook.
k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go:124
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func createAggregatorServer(......) (*aggregatorapiserver.APIAggregator, error) {
// 1、初始化 aggregatorServer
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}
// 2、初始化 auto-registration controller
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
autoRegistrationController := autoregister.NewAutoRegisterController(......)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
crdRegistrationController := crdregistration.NewCRDRegistrationController(......)
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
if err != nil {
return nil, err
}
err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
makeAPIServiceAvailableHealthCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
),
)
if err != nil {
return nil, err
}
return aggregatorServer, nil
}
|
aggregatorConfig.Complete().NewWithDelegate
NewWithDelegate` is the method that initializes the aggregatorServer, the main logic is as follows.
- call
c.GenericConfig.New
to initialize the GenericAPIServer, whose internal main functionality has been analyzed above.
- Call
apiservicerest.NewRESTStorage
to create RESTStorage for APIServices resources, the purpose of RESTStorage is to correspond the access paths of each resource and its back-end storage operations.
- call
s.GenericAPIServer.InstallAPIGroup
to register routing information for the APIGroup.
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:158
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
openAPIConfig := c.GenericConfig.OpenAPIConfig
c.GenericConfig.OpenAPIConfig = nil
// 1、初始化 genericServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
if err != nil {
return nil, err
}
apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(
apiregistrationClient,
5*time.Minute,
)
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
......
}
// 2、为 API 注册路由
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 3、初始化 apiserviceRegistrationController、availableController
apisHandler := &apisHandler{
codecs: aggregatorscheme.Codecs,
lister: s.lister,
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
availableController, err := statuscontrollers.NewAvailableConditionController(
......
)
if err != nil {
return nil, err
}
// 4、添加 PostStartHook
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)
c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
go availableController.Run(5, context.StopCh)
return nil
})
return s, nil
}
|
The above is an analysis of the AggregatorServer initialization process, it can be seen that when creating APIExtensionsServer, KubeAPIServer and AggregatorServer, the pattern is similar, first call c.GenericConfig.New
to initialize the Container according to the go- New
to initialize the Container in go-restful
mode, then create RESTStorage for the resources that need to be registered in the server, and finally register the APIGroup information of the resource to the route.
At this point, the process in CreateServerChain has been analyzed and the call chain is shown below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
|--> CreateNodeDialer
|
|--> CreateKubeAPIServerConfig
|
CreateServerChain --|--> createAPIExtensionsConfig
|
| |--> c.GenericConfig.New
|--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
| |--> s.GenericAPIServer.InstallAPIGroup
|
| |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
| |
|--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
| |
| |--> m.InstallAPIs
|
|
|--> createAggregatorConfig
|
| |--> c.GenericConfig.New
| |
|--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
|
|--> s.GenericAPIServer.InstallAPIGroup
|
prepared.Run
The Run
method first calls CreateServerChain
to complete the initialization of each server, then calls server.PrepareRun
to complete the preparation work before the service starts, and finally calls the prepared.Run
method to start the secure http server.
serverPrepare.Run
mainly completes the health check, survival check and OpenAPI
route registration, and then continues to analyze the flow of prepared.Run
, in which s.NonBlockingRun
is called to complete the startup.
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:269
1
2
3
|
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}
|
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := make(chan struct{})
go func() {
defer close(delayedStopCh)
<-stopCh
time.Sleep(s.ShutdownDelayDuration)
}()
// 调用 s.NonBlockingRun 完成启动流程
err := s.NonBlockingRun(delayedStopCh)
if err != nil {
return err
}
// 当收到退出信号后完成一些收尾工作
<-stopCh
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
<-delayedStopCh
s.HandlerChainWaitGroup.Wait()
return nil
}
|
s.NonBlockingRun
The main logic of s.NonBlockingRun
is as follows.
- Determine whether to start the audit logging service.
- Calling
s.SecureServingInfo.Serve
to configure and start the https server.
- Execute postStartHooks.
- Sending the ready signal to systemd.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
auditStopCh := make(chan struct{})
// 1、判断是否要启动审计日志
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 2、启动 https server
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return err
}
}
go func() {
<-stopCh
close(s.readinessStopCh)
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
// 3、执行 postStartHooks
s.RunPostStartHooks(stopCh)
// 4、向 systemd 发送 ready 信号
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return nil
}
|
The above is the initialization of the server and the analysis of the start-up process, as mentioned above, the most important part of the server initialization process is the initialization of the API Resource RESTStorage and the registration of the route, as the process is more complex, the following will be told separately.
StorageFactory construction
As mentioned above, the backend data corresponding to the handler eventually implemented by apiserver is stored in a Store structure. Here, for example, the routes starting with /api
are used to create the RESTStorage for each resource via the NewLegacyRESTStorage
method. RESTStorage is a structure defined under k8s.io/apiserver/pkg/registry/generic/registry/store.go
, which mainly contains NewFunc
to return resource-specific information, NewListFunc
to return resource-specific list and CreateStrategy
for resource creation, UpdateStrategy
for update, and DeleteStrategy
for deletion. Inside NewLegacyRESTStorage
, you can see the RESTStorage where multiple resources are created.
The call chain for NewLegacyRESTStorage
is CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --> m.InstallLegacyAPI --> legacyRESTStorageProvider.NewLegacyRESTStorage
.
NewLegacyRESTStorage
All resources under an API Group have their own REST implementation, and all groups under k8s.io/kubernetes/pkg/registry
have a rest directory that stores the RESTStorage of the corresponding resources. In the NewLegacyRESTStorage
method, the storage corresponding to each resource will be generated by NewREST
or NewStorage
, and the pod is used as an example here.
k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:102
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver. APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme. IsVersionRegistered(policyGroupVersion) {
var err error
podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
}
// 1、LegacyAPI 下的 resource RESTStorage 的初始化
restStorage := LegacyRESTStorage{}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
......
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
// 2、pod RESTStorage 的初始化
podStorage, err := podstore.NewStorage(......)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
......
serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator. Interface, error) {
......
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var secondaryServiceClusterIPAllocator ipallocator.Interface
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
......
}
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
......
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRest, serviceRestProxy := servicestore.NewREST(......)
// 3、restStorageMap 保存 resource http path 与 RESTStorage 对应关系
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
......
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
......
}
|
podstore.NewStorage
podstore.NewStorage
is a method that generates storage for the pod. The main function of this method is to create backend storage for the pod and eventually return a RESTStorage object, which calls store.CompleteWithOptions
to create the backend storage.
k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go:71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func NewStorage(......) (PodStorage, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
......
}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: pod.GetAttrs,
TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": pod.NodeNameTriggerFunc},
}
// 调用 store.CompleteWithOptions
if err := store.CompleteWithOptions(options); err != nil {
return PodStorage{}, err
}
statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy
ephemeralContainersStore := *store
ephemeralContainersStore.UpdateStrategy = pod.EphemeralContainersStrategy
bindingREST := &BindingREST{store: store}
// PodStorage 对象
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
|
You can see that the different operations on the pod in the final returned object are all one REST object.
The genericregistry.Store
object is automatically integrated in REST, and the store.CompleteWithOptions
method initializes the store instance in the `genericregistry.
1
2
3
4
5
6
7
8
9
|
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
type BindingREST struct {
store *genericregistry.Store
}
......
|
store.CompleteWithOptions
CompleteWithOptions` is mainly used to set some default values for the configuration in the store and to update the store based on the options provided, the main one being to initialize the backend storage instance of the store.
Within the CompleteWithOptions
method, the options.RESTOptions.GetRESTOptions
method is called, which eventually returns the generic.RESTOptions
object, which contains some configuration for the etcd The generic.RESTOptions
object contains some configuration for the initialization of etcd, data serialization methods, and a storage. The StorageWithCacher-->NewRawStorage-->Create
method is called sequentially to create the final dependent backend storage.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1192
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
......
var isNamespaced bool
switch {
case e.CreateStrategy != nil:
isNamespaced = e.CreateStrategy.NamespaceScoped()
case e.UpdateStrategy != nil:
isNamespaced = e.UpdateStrategy.NamespaceScoped()
default:
return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
}
......
// 1、调用 options.RESTOptions.GetRESTOptions
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
// 2、设置 ResourcePrefix
prefix := opts.ResourcePrefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}
if prefix == "/" {
return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
}
if e.KeyRootFunc == nil && e.KeyFunc == nil {
......
}
keyFunc := func(obj runtime.Object) (string, error) {
......
}
// 3、以下操作主要是将 opts 对象中的值赋值到 store 对象中
if e.DeleteCollectionWorkers == 0 {
e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
}
e.EnableGarbageCollection = opts.EnableGarbageCollection
if e.ObjectNameFunc == nil {
......
}
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
}
return nil
}
|
options.RESTOptions
is an interface, and to find the implementation of its GetRESTOptions
method you must know the instance corresponding to options.RESTOptions
when it is initialized in the CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo
method, the instance corresponding to RESTOptions
is StorageFactoryRestOptionsFactory
, so PodStorage The actual object type of genericserver.Config.RESTOptionsGetter
in the store object constructed at initialization is StorageFactoryRestOptionsFactory
, and its GetRESTOptions
method is shown below.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// 调用 generic.StorageDecorator
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
|
In genericregistry.StorageWithCacher
a different method is called which eventually calls factory.Create
to initialize the storage instance, the call chain is: genericregistry.StorageWithCacher --> generic. NewRawStorage --> factory.Create
.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
1
2
3
4
5
6
7
8
9
10
11
|
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
// 目前 k8s 只支持使用 etcd v3
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
|
newETCD3Storage
In newETCD3Storage
, the client of etcd is first created by calling newETCD3Client
, and the client is finally created using the official etcd client tool clientv3.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:209
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
}
client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
var once sync.Once
destroyFunc := func() {
once.Do(func() {
stopCompactor()
client.Close()
})
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
|
At this point, the analysis of the construction of the store in the pod resource is basically completed, different resource corresponds to a REST object, which in turn references the genericregistry.Store
object, and finally the initialization of the `genericregistry. After analyzing the initialization of the store there is another important step which is the registration of the route. The main process of route registration is to build an http path for the resource according to different verbs and to bind the path to the corresponding handler.
Routing Registration
The construction of RESTStorage above corresponds to the legacyRESTStorageProvider.NewLegacyRESTStorage
method in the InstallLegacyAPI
. The following analysis continues with the implementation of the m.GenericAPIServer.InstallLegacyAPIGroup
method in the InstallLegacyAPI
.
k8s.io/kubernetes/pkg/master/master.go:406
1
2
3
4
5
6
7
8
9
10
11
12
|
func (m *Master) InstallLegacyAPI(......) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("Error building core storage: %v", err)
}
......
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("Error in registering group versions: %v", err)
}
return nil
}
|
The call chain for m.GenericAPIServer.InstallLegacyAPIGroup
is very deep and ends up registering handler and routing information for each API resource under the Group, which is: m.GenericAPIServer. InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers
. Several of these methods work as shown below.
s.installAPIResources
: adds a route for each API resource call to apiGroupVersion.InstallREST
.
apiGroupVersion.InstallREST
: adds the restful.WebServic
object to the container.
installer.Install
: returns the final restful.WebService
object
a.registerResourceHandlers
This method implements the conversion from rest.Storage
to restful.Route
. It first determines the REST interface supported by the API Resource, then adds the corresponding handler for the REST interface, and finally registers it with the route.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
admit := a.group.Admit
......
// 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
}
exporter, isExporter := storage.(rest.Exporter)
if !isExporter {
exporter = nil
}
......
// 2、为 resource 添加对应的 actions 并根据是否支持 namespace
switch {
case !namespaceScoped:
......
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
default:
......
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
......
}
// 3、根据 action 创建对应的 route
kubeVerbs := map[string]struct{}{}
reqScope := handlers.RequestScope{
Serializer: a.group.Serializer,
ParameterCodec: a.group.ParameterCodec,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
......
}
......
// 4、从 rest.Storage 到 restful.Route 映射
// 为每个操作添加对应的 handler
for _, action := range actions {
......
verbOverrider, needOverride := storage.(StorageMetricsOverride)
switch action.Verb {
case "GET": ......
case "LIST":
case "PUT":
case "PATCH":
// 此处以 POST 操作进行说明
case "POST":
var handler restful.RouteFunction
// 5、初始化 handler
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
// 6、route 与 handler 进行绑定
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
// 7、添加到路由中
routes = append(routes, route)
case "DELETE":
case "DELETECOLLECTION":
case "WATCH":
case "WATCHLIST":
case "CONNECT":
default:
}
......
return &apiResource, nil
}
|
restfulCreateNamedResource
restfulCreateNamedResource
is the handler for the POST operation, which will eventually be done by calling the createHandler
method.
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:1087
1
2
3
4
5
6
7
8
9
|
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
|
createHandler
The createHandler
is a method to write data to the backend storage, and there is a permission control for the operation of the resource, in the createHandler
the decoder
and admission
operations will be executed first, then the create
method will be called to complete the creation of the resource, in the create
method, validate
and finally save the data to the backend storage. The admit
operation executes the admission-plugins in the kube-apiserver. The admission-plugins are initialized to the admissionChain in CreateKubeAPIServerConfig
, and the initialization call chain is CreateKubeAPIServerConfig --> buildGenericConfig --> s.Admission.ApplyTo --> a.GenericAdmission.ApplyTo --> a.Plugins.NewFromPlugins
, and finally in NewFromPlugins
wraps all enabled plugins as admissionChain, and the admit operation to be performed here is the admit operation in admission-plugins.
The create method called in createHandler
is a method of the genericregistry.Store
object. The genericregistry.Store
object is introduced at each resource initialization of RESTStorage.
All the operations in createHandler
are the request process mentioned at the beginning of this article, as follows.
1
2
|
v1beta1 ⇒ internal ⇒ | ⇒ | ⇒ v1 ⇒ json/yaml ⇒ etcd
admission validation
|
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go:46
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
......
gv := scope.Kind.GroupVersion()
// 1、得到合适的SerializerInfo
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
if err != nil {
scope.err(err, w, req)
return
}
// 2、找到合适的 decoder
decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
if err != nil {
scope.err(err, w, req)
return
}
......
defaultGVK := scope.Kind
original := r.New()
trace.Step("About to convert to expected version")
// 3、decoder 解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
......
ae := request.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
userInfo, _ := request.UserFrom(ctx)
if len(name) == 0 {
_, name, _ = scope.Namer.ObjectName(obj)
}
// 4、执行 admit 操作,即执行 kube-apiserver 启动时加载的 admission-plugins,
admissionAttributes := admission.NewAttributesRecord(......)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
err = mutatingAdmission.Admit(ctx, admissionAttributes, scope)
if err != nil {
scope.err(err, w, req)
return
}
}
......
// 5、执行 create 操作
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
})
......
}
}
|
Summary
This article analyzes the startup process of kube-apiserver. kube-apiserver contains three servers, namely KubeAPIServer, APIExtensionsServer and AggregatorServer, which are connected together through the delegate mode. The initialization process is similar, first create the corresponding config for each server, then initialize the http server, the http server initialization process is to first initialize GoRestfulContainer
, then install the API included in the server, when installing the API first create the corresponding backend storage RES for each API When installing the API, first create the corresponding back-end storage RESTStorage for each API Resource, then add the corresponding handler for each API Resource supported verbs, and register the handler to the route, and finally register the route to the webservice, the implementation process of the RESTFul API is the core of the startup process. The core of the startup process is the implementation of the RESTFul API. As for the implementation of filters such as authentication and authentication in kube-apiserver, multi-version resource conversion, kubernetes service implementation, and other details, we will continue to analyze them in later articles.