Download Kafka
Download the installation package of Kafka from the official website, “kafka_2.13-3.4.0.tgz” is used in this article. Just unarchive it to the installation directory.
Start with ZooKeeper or KRaft
Note: Java 8+ must be installed in your local environment.
Apache Kafka can be started using either ZooKeeper or KRaft. To get started with either configuration, follow the configuration below, but do not use both at the same time.
ZooKeeper
Run the following command to start all services in the correct order.
1
2
|
#Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
|
Among other things, you can customize the directory where the data is stored in the zookeeper.properties
file.
1
2
|
#dataDir=/tmp/zookeeper
dataDir=D:/data/zookeeper
|
Open another terminal session and run the following command.
1
2
|
#Start the Kafka service
$ bin/kafka-server-start.sh config/server.properties
|
Among other things, you can customize the directory where the logs are stored in the server.properties
file.
1
2
|
#log.dirs=/tmp/kafka-logs
log.dirs=D:/data/kafka/kafka-logs
|
Once all services are successfully started, there will be a basic Kafka environment running and ready to use.
Note: The above command is executed under Linux, or in case of Windows environment, execute the executable file in the windows directory, e.g. bin/windows/*.bat
. The command is as follows.
1
2
3
4
5
|
cd bin/windows
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
|
KRaft
Generate the cluster UUID.
1
|
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
|
Set the log directory format.
1
|
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
|
Start the Kafka server.
1
|
$ bin/kafka-server-start.sh config/kraft/server.properties
|
Once the Kafka server is successfully started, there will be a basic Kafka environment running and ready to use.
Create topics to store events
Kafka is a distributed event streaming platform that allows events (also known as records or messages) to be read, written, stored and processed on multiple machines.
Example events include payment transactions, geolocation updates from cell phones, shipping orders, sensor measurements from IoT devices or medical devices, and more. These events are organized and stored by topic.
A topic is similar to a folder in a file system, and events are files in that folder.
Therefore, before writing the first event, a topic must be created. Open another terminal session and run the following command.
1
|
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
|
All of Kafka’s command line tools have other options. For example, it can also display details such as partition counts for new topics.
1
2
3
|
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
Writing events to topics
Kafka clients communicate with the Kafka agent over the network to write (or read) events. Once received, the agent will store the events in a persistent and fault-tolerant manner.
Run the console producer client to write some events to the topic. By default, each line of input will result in a separate event being written to the topic.
1
2
3
|
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
|
The producer client can be stopped at any time using Ctrl-C.
Reading events from a topic
Open another terminal session and run the console user client to read the events just created.
1
2
3
|
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
|
The consumer client can be stopped at any time using Ctrl-C.
Error when enabling KRaft configuration under Windows
The error message is as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
java.io.UncheckedIOException: Error while writing the Quorum status from the file D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state
at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
at org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
at org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:240)
at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:166)
at kafka.server.SharedServer.start(SharedServer.scala:228)
at kafka.server.SharedServer.startForController(SharedServer.scala:128)
at kafka.server.ControllerServer.startup(ControllerServer.scala:188)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:98)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:98)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:98)
at kafka.Kafka$.main(Kafka.scala:115)
at kafka.Kafka.main(Kafka.scala)
Caused by: java.nio.file.FileSystemException: D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state.tmp -> D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state: 另一个程序正在使用此文件,进程无法访问。
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
... 15 more
|
The issue has not been resolved yet, but there has been feedback on GitHub at https://github.com/apache/kafka/pull/12763.
Ref
https://kafka.apache.org/quickstart