Recently, when I was testing log collection, I found that Elasticsearch was a bit overwhelmed by the volume of log data, and the optimization of ES might not be completed overnight, so I planned to add an intermediate layer to export the logs to Kafka, and then consume the logs from Kafka via Logstash and deposit them into Elasticsearch. There is no Kafka cluster in the test environment, so let’s build a Kafka cluster in the test environment first.
The relevant environment versions used in this article are as follows.
1
2
3
4
5
6
|
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.2", GitCommit:"66049e3b21efe110454d67df4fa62b08ea79a19b", GitTreeState:"clean", BuildDate:"2019-05-16T18:55:03Z", GoVersion:"go1.12.5", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.2", GitCommit:"c97fe5036ef3df2967d086711e6c0c405941e14b", GitTreeState:"clean", BuildDate:"2019-10-15T19:09:08Z", GoVersion:"go1.12.10", Compiler:"gc", Platform:"linux/amd64"}
$ helm version
version.BuildInfo{Version:"v3.0.1", GitCommit:"7c22ef9ce89e0ebeb7125ba2ebf7d421f3e82ffa", GitTreeState:"clean", GoVersion:"go1.13.4"}
$ # kafka helm chart 包版本为:kafka-0.20.8.tgz
|
Again, for simplicity, we use Helm3 to install Kafka, and first we need to add an incubator
repository address, since the stable repository does not have a suitable Chart package for Kafka.
1
2
3
4
5
6
|
$ helm repo add incubator http://mirror.azure.cn/kubernetes/charts-incubator/
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈ Happy Helming!⎈
|
Download Kafka’s Helm Chart package locally, which will help us understand how to use the Chart package, or you can skip this step.
1
2
3
|
$ helm fetch incubator/kafka
$ ls kafka-0.20.8.tgz
$ tar -xvf kafka-0.20.8.tgz
|
Then create a new file called kafka-test.yaml with the following contents.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
resources:
limits:
cpu: 200m
memory: 1536Mi
requests:
cpu: 100m
memory: 1024Mi
livenessProbe:
initialDelaySeconds: 60
persistence:
storageClass: "rook-ceph-block"
|
Since kafka is slow when it first starts, try to set the initialization time of the health check longer, we set it to livenessProbe.initialDelaySeconds=60
, the resource declaration can be declared according to the actual situation of our cluster, and finally if you need to persist kafka data, you also need to provide A StorageClass, we also know that kafka’s IO requirements for disk itself is also very high, so it is best to use Local PV, we use here is a ceph rbd StorageClass resource object: (storageclass.yaml)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: rook-ceph-block
provisioner: rook-ceph.rbd.csi.ceph.com
parameters:
# clusterID 是 rook 集群运行的命名空间
clusterID: rook-ceph
# 指定存储池
pool: k8s-test-pool
# RBD image (实际的存储介质) 格式. 默认为 "2".
imageFormat: "2"
# RBD image 特性. CSI RBD 现在只支持 `layering` .
imageFeatures: layering
# Ceph 管理员认证信息,这些都是在 clusterID 命名空间下面自动生成的
csi.storage.k8s.io/provisioner-secret-name: rook-csi-rbd-provisioner
csi.storage.k8s.io/provisioner-secret-namespace: rook-ceph
csi.storage.k8s.io/node-stage-secret-name: rook-csi-rbd-node
csi.storage.k8s.io/node-stage-secret-namespace: rook-ceph
# 指定 volume 的文件系统格式,如果不指定, csi-provisioner 会默认设置为 `ext4`
csi.storage.k8s.io/fstype: ext4
reclaimPolicy: Retain
|
The specific storage solution needs to be selected according to our own actual situation, I use the Rook built Ceph here, relatively simple to use.
Once the custom values file is ready, you can use Helm to install it directly.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
$ kubectl create ns kafka
$ helm install -f kafka.yaml kfk incubator/kafka --namespace kafka
NAME: kfk
LAST DEPLOYED: Tue Mar 17 11:49:51 2020
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
NOTES:
### Connecting to Kafka from inside Kubernetes
You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this:
apiVersion: v1
kind: Pod
metadata:
name: testclient
namespace: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:5.0.1
command:
- sh
- -c
- "exec tail -f /dev/null"
Once you have the testclient pod above running, you can list all kafka
topics with:
kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --list
To create a new topic:
kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
To listen for messages on a topic:
kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning
To stop the listener session above press: Ctrl+C
To start an interactive message producer session:
kubectl -n kafka exec -ti testclient -- kafka-console-producer --broker-list kfk-kafka-headless:9092 --topic test1
To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C
If you specify "zookeeper.connect" in configurationOverrides, please replace "kfk-zookeeper:2181" with the value of "zookeeper.connect", or you will get error.
|
After successful installation, you can check the status of the Release.
1
2
3
|
$ helm ls -n kafka
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
kfk kafka 1 2020-03-17 14:50:41.595746 +0800 CST deployed kafka-0.20.8 5.0.1
|
A cluster of 3 instances of kafka and zookeeper is normally deployed every now and then.
1
2
3
4
5
6
7
8
|
$ kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
kfk-kafka-0 1/1 Running 0 3h52m
kfk-kafka-1 1/1 Running 0 3h50m
kfk-kafka-2 1/1 Running 0 3h48m
kfk-zookeeper-0 1/1 Running 0 3h55m
kfk-zookeeper-1 1/1 Running 0 3h54m
kfk-zookeeper-2 1/1 Running 0 3h54m
|
After deployment, create a test client to test if the kafka cluster is working: (testclient.yaml)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
apiVersion: v1
kind: Pod
metadata:
name: testclient
namespace: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:5.0.1
command:
- sh
- -c
- "exec tail -f /dev/null"
|
It is also straightforward to deploy the resource objects above.
1
2
3
4
5
|
$ kubectl apply -f testclient.yaml
$ kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
testclient 1/1 Running 0 3h44m
......
|
After the test client is created, create a new topic with the following command:
1
2
|
$ kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
Created topic "test1".
|
You can see that the topic test1
was created successfully. Then you can run the following command to listen for messages from the topic test1
.
1
|
$ kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning
|
Then open a new command line terminal to generate a message.
1
2
3
|
$ kubectl -n kafka exec -ti testclient -- kafka-console-producer --broker-list kfk-kafka-headless:9092 --topic test1
>Hello kafka on k8s
>
|
At this time, you can see the corresponding message log in the listener of the topic test1.
1
2
|
$ kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning
Hello kafka on k8s
|
This shows that our kafka deployment has successfully run on top of a Kubernetes cluster. Of course, we are only using it in a test environment, and there are many things to consider about whether you can deploy kafka on a Kubernetes cluster in a production environment, and it is more recommended to use an Operator for stateful applications, such as Confluent’s Kafka Operator, in short, it doesn’t matter if you can hold it, just use it 🤣