Kafka Java Programming

I’ve been learning Apache Kafka over the past week. I took the Udemy course Apache Kafka Series - Learn Apache Kafka for Beginners v3. I’m using this blog to take notes, and this post focuses specifically on Java programming with Kafka.

Producer

Basics

To configure a Kafka producer:

  • Set the key and value serializer to, for example, StringSerializer
  • flush() the buffer
  • close() the producer, which will flush the buffer automatically

Callback

Callbacks allow us to examine the state of a message.

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            log.info("Received new metadata\n" +
            "Topic: " + metadata.topic() + "\n" +
            "Partition: " + metadata.partition() + "\n"+
            "Offset: " + metadata.offset() + "\n" +
            "Timestamp: " + metadata.timestamp()
            );
        } else {
            log.error("Error while producing", exception);
        }
    }
});

We just need to override the onCompletion() method. If we fail to deliver a message to a topic, the exception will provide details about the error. If we successfully send a message to a topic, we can check the metadata of that message.

Keys

ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,  val);

Messages with the same key will be sent to the same partition.

Consumer

Basics

To configure a Kafka consumer:

  • Set the key and value deserializer to, for example, StringDeserializer
  • Set the group ID. As mentioned earlier, if the group ID is not set, Kafka will randomly assign one
  • auto.offset.reset:
    • none - program will throw an exception if a topic does not exist
    • earliest - equivalent to --from-beginning
    • latest - only new messages

Typical long polling logic:

while (true) {
    log.info("polling...");

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // wait 1 sec

    for (ConsumerRecord<String, String> record : records) {
        log.info("Key: " + record.key() + "| Value: " + record.value());
        log.info("Partition: " + record.partition() + "| Offset: " + record.offset());
    }
}

If the consumer fetches new messages, it will process them immediately. If the consumer doesn’t see any new messages, it will wait for the specified Duration.ofMillis() milliseconds.

The log output is quite interesting to read:

[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-java-application-1-c88cbce3-6174-490e-89ff-695ac8e55734', protocol='range'}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Adding newly assigned partitions: demo_java-0, demo_java-1, demo_java-2
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-0
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-1
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-2
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Resetting offset for partition demo_java-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[15.223.178.125:28201 (id: 8 rack: cac1-az4)], epoch=0}}.
[main] INFO ConsumerDemo - Key: id_1| Value: message sequence num 1
[main] INFO ConsumerDemo - Partition: 0| Offset: 0
[main] INFO ConsumerDemo - Key: id_3| Value: message sequence num 3
[main] INFO ConsumerDemo - Partition: 0| Offset: 1
[main] INFO ConsumerDemo - Key: id_6| Value: message sequence num 6
[main] INFO ConsumerDemo - Partition: 0| Offset: 2

This matches exactly what we learned from the theory course:

  1. The consumer joins the group
  2. Since it’s the only consumer and there are three partitions in this topic, all three partitions are assigned to it
  3. The consumer resets the offset to 0 for all partitions because auto.offset.reset is set to earliest
  4. The consumer starts with offset 0, reads messages, and commits the offset

If we run the consumer program again, we’ll see:

[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[15.223.178.125:28201 (id: 8 rack: cac1-az4)], epoch=0}}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-1 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[35.183.47.125:28201 (id: 9 rack: cac1-az1)], epoch=0}}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-2 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[3.99.146.13:28201 (id: 7 rack: cac1-az2)], epoch=0}}

The above log shows that the consumer group has caught up with all messages. For example, it has read all messages in partition 1 up to offset 8.

Partition Rebalance

If we run multiple consumer instances, we should be able to see that partition assignments are updated.

Here, new partitions are assigned to this consumer:

[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Notifying assignor about the new Assignment(partitions=[demo_java-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Adding newly assigned partitions: demo_java-0

Here are the rebalancing strategies:

Eager Rebalance

Eager Rebalance

All partition assignments will be revoked, and random reassignment will be made.

Cooperative Rebalance

Other consumers that don’t have reassigned partitions can still process messages without interruption. This approach is much better.

Cooperative Rebalance

Usage

Rebalance Strategies

The first three are eager rebalance strategies.

When the consumer runs, the log shows the strategy:

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

All we need to do is change the property setting:

properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());

This produces the following log:

Assigned partitions:                       [demo_java-0, demo_java-1, demo_java-2]
Current owned partitions:                  []
Added partitions (assigned - owned):       [demo_java-0, demo_java-1, demo_java-2]
Revoked partitions (owned - assigned):     []

When a new consumer joins:

Assigned partitions:                       []
Current owned partitions:                  []
Added partitions (assigned - owned):       []
Revoked partitions (owned - assigned):     []

But then it becomes:

Assigned partitions:                       [demo_java-2]
Current owned partitions:                  []
Added partitions (assigned - owned):       [demo_java-2]
Revoked partitions (owned - assigned):     []

The first consumer now shows:

Assigned partitions:                       [demo_java-0, demo_java-1]
Current owned partitions:                  [demo_java-0, demo_java-1, demo_java-2]
Added partitions (assigned - owned):       []
Revoked partitions (owned - assigned):     [demo_java-2]

Offset Commit Behavior

You can control how often offset messages are committed:

auto.commit.interval.ms=5000
enable.auto.commit=true

With these settings, offset messages will be auto-committed every 5 seconds.

Offset commit behavior