Overview

I’ve been learning Apache Kafka over the past week. I took the Udemy course Apache Kafka Series - Learn Apache Kafka for Beginners v3. I’m using this blog to take notes, and this post focuses specifically on Kafka CLI commands.

Installation and Setup

Client

Installing the Kafka client on macOS is very straightforward:

brew install kafka

Homebrew will install both JDK and Kafka for you. You can verify the installation by running kafka-topics in your terminal.

Server

Setting up the server is more challenging than the client installation.

Conduktor

The course instructor heavily promoted his startup project Conduktor, but the free playground option is no longer available.

Local Cluster

I can run a local cluster with Docker and connect to it using:

kafka-topics.sh --bootstrap-server localhost:19092 --list

Note that the port number is 19092 instead of the default 9092.

Aiven

However, since a localhost setup can only have one broker, it defeats the purpose of learning Kafka clustering. I ended up creating a free account with Aiven, which allows me to use a cluster with 3 brokers (3 VM nodes) for free for one month.

I followed the instructions to configure the server, though the instructions weren’t very clear.

Here are my notes:

  • Download the certificate from Aiven, but rename it from ca.pem to certificate.pem
  • Create a kafka-client.config file on your OS and use the username and credentials from the website
  • Tip: Save the command prefix in a file: kafka-topics --bootstrap-server <server-uri> --command-config kafka-client.config

With that setup complete, it’s time to learn Kafka CLI commands.

Commands

Create Topics

# Create a topic with 5 partitions and 2 replicas
kafka-topics ... --create --topic first_topic --partitions 5 --replication-factor 2

# Show the details of this topic
kafka-topics ... --describe --topic first_topic

# Output
Topic: first_topic	TopicId: DWPKB-MRR22-oknWuo5tLA	PartitionCount: 5	ReplicationFactor: 2	Configs: cleanup.policy=delete,segment.bytes=1073741824,unclean.leader.election.enable=false
	Topic: first_topic	Partition: 0	Leader: 1	Replicas: 1,3	Isr: 1,3	Elr: N/A	LastKnownElr: N/A
	Topic: first_topic	Partition: 1	Leader: 2	Replicas: 2,1	Isr: 2,1	Elr: N/A	LastKnownElr: N/A
	Topic: first_topic	Partition: 2	Leader: 3	Replicas: 3,2	Isr: 3,2	Elr: N/A	LastKnownElr: N/A
	Topic: first_topic	Partition: 3	Leader: 1	Replicas: 1,2	Isr: 1,2	Elr: N/A	LastKnownElr: N/A
	Topic: first_topic	Partition: 4	Leader: 2	Replicas: 2,3	Isr: 2,3	Elr: N/A	LastKnownElr: N/A

Producer

To produce messages:

  • Use kafka-console-producer
  • Use --producer.config (not --command-config) for the kafka-client.config file

Advanced options for using keys when sending messages:

  • --property parse.key=true
  • --property key.separator=:

Now you can send messages like:

>name:huijing huang

For learning purposes, you can force the producer to use the round-robin partitioner class to send messages to different partitions. Otherwise, the producer will keep sending messages to the same partition until that partition has enough data.

--producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner

Consumer

To consume messages:

  • Use kafka-console-consumer
  • Use --consumer.config for the kafka-client.config file

Notes:

  • The consumer will only receive new messages by default. To consume historical data, use --from-beginning
  • An interesting experiment: if we force the producer to send messages to different partitions, the consumer will consume messages that appear out of order. However, the message order within each partition remains consistent!

Here are some powerful configuration options for the consumer:

  • --formatter kafka.tools.DefaultMessageFormatter - set formatter (this might not work in all versions)
  • --property print.timestamp=true - print timestamp
  • --property print.key=true --property print.value=true - print both key and value
  • --property print.partition=true - print the partition number
  • --from-beginning - consume messages from the beginning

Consumer Group

This is a very interesting experiment.

Given the 5 partitions we created previously, let’s create a consumer group with 3 consumers:

--group my-first-application

Each consumer should connect to some partitions in the topic. Since we’re forcing the producer to send messages using a round-robin mechanism, we should see messages appearing in different consumers.

A message can only be consumed by one consumer within that group.

Keep in mind the “inactive” consumer scenario where the number of consumers in a group exceeds the number of partitions in a topic.

Also remember that the --from-beginning option for a consumer group is only valid once. When a consumer group catches up with all messages, the commit information is stored. If you start the consumer group with the --from-beginning option again, it won’t consume historical data.

Note: When you create a consumer, you can assign it to a consumer group:

kafka-console-consumer --topic third_topic --group my-first-application

To check the status of a consumer group, use the consumer group command:

kafka-consumer-groups.sh --command-config playground.config --describe --group my-second-application

# Output
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-first-application tri_topic       1          0               6               6               -               -               -
my-first-application tri_topic       2          0               5               5               -               -               -
my-first-application tri_topic       0          0               6               6               -               -               -
  • You can see the offset and lag for each partition
  • If there are active consumer connections to the consumer group, you can see the consumer id of the consumer that connects to each partition

Reset Offsets

You can reset offsets using --reset-offsets --to-earliest. There are many options for setting specific offsets, including by timestamp cutoff.

  • With --dry-run option: offsets will be reset, but messages won’t be consumed
  • With --execute option: messages will be consumed

Additional notes:

  • Offset operations must be performed at the consumer group level
  • If we don’t specify a group ID for a consumer, a random consumer group ID will be created temporarily (e.g., console-consumer-65971)