As the cloud-native ecosystem continues to evolve, most of the current Kubernetes-based cloud-native technologies almost always adopt the CRD + Controller model. Even without a custom CRD, there will be a need for a controller to detect resources of interest and do the work required by the business when their state changes.
controller-runtime is a relatively good tool provided by the Kubernetes community to quickly build a set of watch for ApiServer. This article will give a brief summary and introduction of how controller-runtime works and how it is used in different scenarios.
Architecture
The architecture of controller-runtime can be summarized in the following diagram. Note: Webhook is not in the scope of this article, so it is omitted from the diagram.
The main components are Manager and Reconciler created by the user and Cache and Controller started by Controller Runtime itself. First, the user side, Manager is created by the user during initialization and used to start the Controller Runtime components; Reconciler is a component that the user needs to provide to handle their own business logic.
The controller-runtime component, Cache, as the name implies, is a cache, which is used to establish the Informer’s connection to the ApiServer to watch the resources and push the watched objects into the queue; the Controller will register the eventHandler with the Informer on the one hand, and get the data from the queue on the other. On the other hand, the controller will take data from the queue and execute the user-side Reconciler functions.
The entire workflow on the controller-runtime side is as follows.
First, the Controller registers a resource-specific eventHandler with the Informer; then the Cache starts the Informer, which sends a request to the ApiServer to establish a connection; when the Informer detects a resource change, it uses the When the Informer detects a resource change, it uses the eventHandler registered with the Controller to determine if it is pushed into the queue; when an element is pushed into the queue, the Controller takes the element out and executes the Reconciler on the user side.
Usage
The following describes several different scenarios of usage.
General Usage
We are already familiar with the usage of controller-runtime, and the simplest usage can be expressed in the following code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
})
// 2. init Reconciler(Controller)
_ = ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&ApplicationReconciler{})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
type ApplicationReconciler struct {
}
func (a ApplicationReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
|
The first step is to initialize the Manager and generate a default configuration of the Cache.
The second step is to initialize the Controller.
ctrl.NewControllerManagedBy
: used to create the Controller and inject some configurations of the Manager generated in the first step into the Controller.
For
: A shortcut method provided by Controller Runtime to specify the resource type of the watch.
Owns
: sometimes the Owns
method is used to indicate that a resource is a slave of a resource I care about and its event will go into the Controller’s queue.
Complete
is also a shortcut method for generating a Controller, registering the user’s Reconciler into the Controller, and generating the default eventHandler for the watch resource, while executing the Controller’s watch
function.
The user’s Reconciler just needs to implement the reconcile.Reconciler
interface.
The last step is to start the Manager, which starts the Cache, i.e. the Informer, and the Controller at the same time.
Setting up EventHandler
In the whole architecture, Informer plays the role of ListWatch for ApiServer, and when it detects the change of the resource it is interested in, it will process it according to the registered eventHandler and determine whether it needs to be pushed into the queue.
So, in the process of using it, we can register the eventHandler function of Informer into it when we create the Controller, as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{})
_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
...
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
...
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
...
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
|
Adding the logic of judging resources before they are added to the Queue in predicate can effectively prevent the queue from being pushed with too many useless resources. If our Reconciler needs to detect multiple resources, here the Controller can perform watch for different resource types and register different eventHandler each time.
Set Cache selector
In addition, we can also add a valid LabelSelector or FieldSelector to Informer’s ListWatch function to further reduce the detected invalid resources, which can also play a role in reducing the pressure on ApiServer in case of a large amount of cluster resources. The details are as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{}),
},
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
},
},
}),
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{})
_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
...
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
...
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
...
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
|
Note that controller-runtime is only available in version v0.11.0 to set the cache selector.
The way to do this is to use the cache.BuilderWithOptions
function to register the LabelSelector or FieldSelector when initializing the Manager, and to register the scheme so that when the Informer generated by the cache makes a request to the ApiServer, it will also give the resource scheme. The resource scheme is given at the same time.
Here you can see from the source code that Cache generates 3 types of Informer, structured
, unstructured
and metadata
. The following is a list of the 3 types of Informers that are started at the same time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
Scheme: scheme,
}
}
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
go m.structured.Start(ctx)
go m.unstructured.Start(ctx)
go m.metadata.Start(ctx)
<-ctx.Done()
return nil
}
|
Among them, structured
is a deterministic resource, which needs to register the corresponding resource type in scheme; unstructured
is an indeterminate resource; and metadata
is a request to ApiServer in the form of protobuf
.
Take structured
as an example.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors(gvk).ApplyToList(&opts)
res := listObj.DeepCopyObject()
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}
|
You can see that in Informer’s ListWatch interface, p.selectors(gvk).ApplyToList(&opts)
will add the selector
we registered at the beginning to the list/watch
request that follows.
In the above example, we mentioned that metadata
uses protobuf
serialized form to request ApiServer, which is more efficient than the default serialized type json, and performs better in large-scale environments. However, not all resource types support the protobuf format, for example, CRD does not.
Another point to note is that in Metadata data, the only data that is watched is metadata, not spec and status.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
func start() {
scheme := runtime.NewScheme()
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{}),
},
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
},
},
}),
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{})
_ = ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&ApplicationReconciler{})
u := &metav1.PartialObjectMetadata{}
u.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "Pod",
Group: "",
Version: "v1",
})
_ = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return true
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return true
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
|
In the Cache metadata data, the data format used is meta.v1.PartialObjectMetadata
, the premise is that the user only cares about the metadata of the resource, not its spec and status, so in the ListWatch function of the ApiServer, only the metadata is fetched. The source code is as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
// PartialObjectMetadata is a generic representation of any object with ObjectMeta. It allows clients
// to get access to a particular ObjectMeta schema without knowing the details of the version.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type PartialObjectMetadata struct {
TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}
func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
// Always clear the negotiated serializer and use the one
// set from the metadata client.
cfg := rest.CopyConfig(ip.config)
cfg.NegotiatedSerializer = nil
// grab the metadata client
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, err
}
ctx := context.TODO()
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors(gvk).ApplyToList(&opts)
var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
list, err = client.Resource(mapping.Resource).List(ctx, opts)
}
if list != nil {
for i := range list.Items {
list.Items[i].SetGroupVersionKind(gvk)
}
}
return list, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
var (
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
}
if watcher != nil {
watcher = newGVKFixupWatcher(gvk, watcher)
}
return watcher, err
},
}, nil
}
|
As you can see, the controller-runtime uses client-go.metadata.Client
and the data format returned by this Client’s interface is PartialObjectMetadata
.
Summary
controller-runtime is a very useful tool for generating resource controllers, and we can use controller-runtime to quickly generate the resource controllers we need in the usual development process. At the same time, controller-runtime also provides many ways to not only build controllers quickly, but also to configure them flexibly for different business needs to achieve the desired results.