In previous articles we have discussed how to design and implement an operator within a single kubernetes cluster, but as our application scales up or for various other reasons within the company (e.g. permissions, etc.) we have to adopt multiple kubernetes clusters to meet our needs. How does our operator fit into this multi-cluster scenario?
Of course, there are many solutions for multi-cluster scenarios, such as ClusterNet, Karmada, and so on. But sometimes we still have the need for operators to listen directly to the resources of multiple clusters, maybe because of permissions or because we don’t want such a heavy solution.
Multi-Cluster Operator Practices
Requirements
First let’s set the requirements and environment
- We now have cluster main and cluster sub, where main is the main cluster and sub is the sub-cluster
- We have a CRD in the main cluster, and the function of this CRD is to create a job
- Now in a multi-cluster environment, our main cluster listens to the creation of a CRD and automatically creates a job in the main cluster and sub-clusters
Creating an experimental environment
Build a cluster using kind.
1
2
3
4
5
|
# Creating a master cluster
kind create cluster --name main
# Create subclusters
kind create cluster --name sub
|
Code
The main logic is shown below, it’s actually adding the subcluster client to TestReconciler
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
// TestReconciler reconciles a Test object
type TestReconciler struct {
// Main cluster client
client.Client
// List of clients of all clusters
Clients map[string]client.Client
Scheme *runtime.Scheme
}
// NewTestReconciler ...
func NewTestReconciler(mgr ctrl.Manager, clusters map[string]cluster.Cluster) (*TestReconciler, error) {
r := TestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Clients: map[string]client.Client{
"main": mgr.GetClient(),
},
}
for name, cluster := range clusters {
r.Clients[name] = cluster.GetClient()
}
err := r.SetupWithManager(mgr)
return &r, err
}
func (r *TestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var test jobv1.Test
var res ctrl.Result
err := r.Get(ctx, req.NamespacedName, &test)
if err != nil {
return res, client.IgnoreNotFound(err)
}
job := test.Job()
for _, c := range r.Clients {
err := c.Create(ctx, job.DeepCopy())
if err != nil {
return res, err
}
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *TestReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&jobv1.Test{})
return builder.Complete(r)
}
|
Note that when we initialize main.go
, we need to use mgr.Add()
to add the subcluster to the manager, which will be used later when listening for resource changes.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// NewSubClusters initializes the subclusters
// Both context clusters need to be present in the ~/.kube/config file
func NewSubClusters(mgr ctrl.Manager, clientContexts ...string) map[string]cluster.Cluster {
clusters := map[string]cluster.Cluster{}
for _, v := range clientContexts {
conf, err := config.GetConfigWithContext(v)
checkErr(err, "get client config fail", "context", v)
c, err := cluster.New(conf)
checkErr(err, "new cluster fail", "context", v)
err = mgr.Add(c)
checkErr(err, "add cluster in manager", "context", v)
clusters[v] = c
}
return clusters
}
|
How to listen to resource changes in multiple clusters at the same time?
Above we showed how to create resources like in multiple clusters, this is actually very simple and can be achieved without controller-runtime, as in this example above, often the creation does not solve the problem, we also need to follow the status of the created resources.
Suppose we have a requirement: as long as there is a job error, then we think the status of the CRD should be Error, how to implement it?
Official example
In the Move cluster-specific code out of the manager there is a simple example in the design document, but I don’t think it’s very good because it’s a bit too simple and crude
- First listened directly to both clusters for the Secret resource when listening for resource changes.
- Then within the
Reconcile
method, since it is not internally known which cluster the resource is from, it has to try the first cluster first and then the second cluster.
So can we tell at Reconcile time which event is from which cluster?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
type secretMirrorReconciler struct {
referenceClusterClient, mirrorClusterClient client.Client
}
func (r *secretMirrorReconciler) Reconcile(r reconcile.Request)(reconcile.Result, error){
s := &corev1.Secret{}
if err := r.referenceClusterClient.Get(context.TODO(), r.NamespacedName, s); err != nil {
if kerrors.IsNotFound{ return reconcile.Result{}, nil }
return reconcile.Result, err
}
if err := r.mirrorClusterClient.Get(context.TODO(), r.NamespacedName, &corev1.Secret); err != nil {
if !kerrors.IsNotFound(err) {
return reconcile.Result{}, err
}
mirrorSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Namespace: s.Namespace, Name: s.Name},
Data: s.Data,
}
return reconcile.Result{}, r.mirrorClusterClient.Create(context.TODO(), mirrorSecret)
}
return nil
}
func NewSecretMirrorReconciler(mgr manager.Manager, mirrorCluster cluster.Cluster) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch Secrets in the reference cluster
For(&corev1.Secret{}).
// Watch Secrets in the mirror cluster
Watches(
source.NewKindWithCache(&corev1.Secret{}, mirrorCluster.GetCache()),
&handler.EnqueueRequestForObject{},
).
Complete(&secretMirrorReconciler{
referenceClusterClient: mgr.GetClient(),
mirrorClusterClient: mirrorCluster.GetClient(),
})
}
// ... Omit the main function
|
Code
The hard part of the implementation is how to distinguish the source clusters of the events. In the Reconcile
parameter ctrl.Request
there are only two fields, namespace and name, so the only way we can distinguish the clusters is from these two fields.
1
|
Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
|
Obviously namespace is more suitable than name, so we can add a rule to namespace, the actual value of namespace field becomes ${cluster}/${namespace}
, so we need to add the We need to add the cluster flag to the event entry, and then use the corresponding client in Reconcile
to perform the operation according to the cluster.
First of all, when listening, we can customize a handler and inject the cluster name into it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// MuiltClustersEnqueue
// Append the cluster name to the Namespace
func MuiltClustersEnqueue(clusterName string) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: o.GetName(),
Namespace: clusterName + "/" + o.GetNamespace(),
},
},
}
})
}
// SetupWithManager sets up the controller with the Manager.
func (r *TestJobReconciler) SetupWithManager(mgr ctrl.Manager, cs map[string]cluster.Cluster) error {
build := ctrl.NewControllerManagedBy(mgr).
For(&batchv1.Job{})
// Listening to multiple clusters
for name, cluster := range cs {
build = build.Watches(
source.NewKindWithCache(&batchv1.Job{}, cluster.GetCache()),
MuiltClustersEnqueue(name),
)
}
return build.Complete(r)
}
|
Then we just get the correct cluster client within Reconcile to operate.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func (r *TestJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var res ctrl.Result
logger := log.FromContext(ctx)
var job batchv1.Job
cluster, ns := GetClusterNameNs(req.Namespace)
req.Namespace = ns
logger.Info("get job", "cluster", cluster)
err := r.GetClient(cluster).Get(ctx, req.NamespacedName, &job)
if err != nil {
return res, client.IgnoreNotFound(err)
}
if job.Status.CompletionTime.IsZero() {
return res, nil
}
logger.Info("job complete", "cluster", cluster)
var test jobv1.Test
err = r.Get(ctx, clusterx.GetOwnerNameNs(&job), &test)
if err != nil {
return res, client.IgnoreNotFound(err)
}
test.Status.Phase = "finished"
err = r.Client.Status().Update(ctx, &test)
return ctrl.Result{}, err
}
|
Summary
The final implementation we gave is just a simple demo, in the actual project it is best to do some abstraction of the code again, we can abstract the multi-cluster related operations are put together, so it will be easier to maintain.
Here’s a simple example, which I won’t go into within the article for space.