Overview
I’ve been learning Apache Kafka over the past week. I took the Udemy course Apache Kafka Series - Learn Apache Kafka for Beginners v3. I’m using this blog to take notes, and this post focuses specifically on Kafka CLI commands.
Installation and Setup
Client
Installing the Kafka client on macOS is very straightforward:
brew install kafka
Homebrew will install both JDK and Kafka for you.
You can verify the installation by running kafka-topics
in your terminal.
Server
Setting up the server is more challenging than the client installation.
Conduktor
The course instructor heavily promoted his startup project Conduktor
, but the free playground option is no longer available.
Local Cluster
I can run a local cluster with Docker and connect to it using:
kafka-topics.sh --bootstrap-server localhost:19092 --list
Note that the port number is 19092
instead of the default 9092
.
Aiven
However, since a localhost setup can only have one broker, it defeats the purpose of learning Kafka clustering. I ended up creating a free account with Aiven, which allows me to use a cluster with 3 brokers (3 VM nodes) for free for one month.
I followed the instructions to configure the server, though the instructions weren’t very clear.
Here are my notes:
- Download the certificate from Aiven, but rename it from
ca.pem
tocertificate.pem
- Create a
kafka-client.config
file on your OS and use the username and credentials from the website - Tip: Save the command prefix in a file:
kafka-topics --bootstrap-server <server-uri> --command-config kafka-client.config
With that setup complete, it’s time to learn Kafka CLI commands.
Commands
Create Topics
# Create a topic with 5 partitions and 2 replicas
kafka-topics ... --create --topic first_topic --partitions 5 --replication-factor 2
# Show the details of this topic
kafka-topics ... --describe --topic first_topic
# Output
Topic: first_topic TopicId: DWPKB-MRR22-oknWuo5tLA PartitionCount: 5 ReplicationFactor: 2 Configs: cleanup.policy=delete,segment.bytes=1073741824,unclean.leader.election.enable=false
Topic: first_topic Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: N/A LastKnownElr: N/A
Topic: first_topic Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: N/A LastKnownElr: N/A
Topic: first_topic Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: N/A LastKnownElr: N/A
Topic: first_topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: N/A LastKnownElr: N/A
Topic: first_topic Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: N/A LastKnownElr: N/A
Producer
To produce messages:
- Use
kafka-console-producer
- Use
--producer.config
(not--command-config
) for thekafka-client.config
file
Advanced options for using keys when sending messages:
--property parse.key=true
--property key.separator=:
Now you can send messages like:
>name:huijing huang
For learning purposes, you can force the producer to use the round-robin partitioner class to send messages to different partitions. Otherwise, the producer will keep sending messages to the same partition until that partition has enough data.
--producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
Consumer
To consume messages:
- Use
kafka-console-consumer
- Use
--consumer.config
for thekafka-client.config
file
Notes:
- The consumer will only receive new messages by default. To consume historical data, use
--from-beginning
- An interesting experiment: if we force the producer to send messages to different partitions, the consumer will consume messages that appear out of order. However, the message order within each partition remains consistent!
Here are some powerful configuration options for the consumer:
--formatter kafka.tools.DefaultMessageFormatter
- set formatter (this might not work in all versions)--property print.timestamp=true
- print timestamp--property print.key=true --property print.value=true
- print both key and value--property print.partition=true
- print the partition number--from-beginning
- consume messages from the beginning
Consumer Group
This is a very interesting experiment.
Given the 5 partitions we created previously, let’s create a consumer group with 3 consumers:
--group my-first-application
Each consumer should connect to some partitions in the topic. Since we’re forcing the producer to send messages using a round-robin mechanism, we should see messages appearing in different consumers.
A message can only be consumed by one consumer within that group.
Keep in mind the “inactive” consumer scenario where the number of consumers in a group exceeds the number of partitions in a topic.
Also remember that the --from-beginning
option for a consumer group is only valid once.
When a consumer group catches up with all messages, the commit information is stored. If you start the consumer group with the --from-beginning
option again, it won’t consume historical data.
Note: When you create a consumer, you can assign it to a consumer group:
kafka-console-consumer --topic third_topic --group my-first-application
To check the status of a consumer group, use the consumer group command:
kafka-consumer-groups.sh --command-config playground.config --describe --group my-second-application
# Output
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application tri_topic 1 0 6 6 - - -
my-first-application tri_topic 2 0 5 5 - - -
my-first-application tri_topic 0 0 6 6 - - -
- You can see the
offset
andlag
for each partition - If there are active consumer connections to the consumer group, you can see the
consumer id
of the consumer that connects to each partition
Reset Offsets
You can reset offsets using --reset-offsets --to-earliest
.
There are many options for setting specific offsets, including by timestamp cutoff.
- With
--dry-run
option: offsets will be reset, but messages won’t be consumed - With
--execute
option: messages will be consumed
Additional notes:
- Offset operations must be performed at the consumer group level
- If we don’t specify a group ID for a consumer, a random consumer group ID will be created temporarily (e.g.,
console-consumer-65971
)