Kafka Java Programming
I’ve been learning Apache Kafka over the past week. I took the Udemy course Apache Kafka Series - Learn Apache Kafka for Beginners v3. I’m using this blog to take notes, and this post focuses specifically on Java programming with Kafka.
Producer
Basics
To configure a Kafka producer:
- Set the key and value
serializer
to, for example,StringSerializer
flush()
the bufferclose()
the producer, which will flush the buffer automatically
Callback
Callbacks allow us to examine the state of a message.
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("Received new metadata\n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n"+
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp()
);
} else {
log.error("Error while producing", exception);
}
}
});
We just need to override the onCompletion()
method.
If we fail to deliver a message to a topic, the exception will provide details about the error.
If we successfully send a message to a topic, we can check the metadata of that message.
Keys
ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key, val);
Messages with the same key will be sent to the same partition.
Consumer
Basics
To configure a Kafka consumer:
- Set the key and value deserializer to, for example,
StringDeserializer
- Set the group ID. As mentioned earlier, if the group ID is not set, Kafka will randomly assign one
auto.offset.reset
:none
- program will throw an exception if a topic does not existearliest
- equivalent to--from-beginning
latest
- only new messages
Typical long polling logic:
while (true) {
log.info("polling...");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // wait 1 sec
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + "| Value: " + record.value());
log.info("Partition: " + record.partition() + "| Offset: " + record.offset());
}
}
If the consumer fetches new messages, it will process them immediately.
If the consumer doesn’t see any new messages, it will wait for the specified Duration.ofMillis()
milliseconds.
The log output is quite interesting to read:
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-java-application-1-c88cbce3-6174-490e-89ff-695ac8e55734', protocol='range'}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Adding newly assigned partitions: demo_java-0, demo_java-1, demo_java-2
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-0
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-1
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Found no committed offset for partition demo_java-2
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Resetting offset for partition demo_java-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[15.223.178.125:28201 (id: 8 rack: cac1-az4)], epoch=0}}.
[main] INFO ConsumerDemo - Key: id_1| Value: message sequence num 1
[main] INFO ConsumerDemo - Partition: 0| Offset: 0
[main] INFO ConsumerDemo - Key: id_3| Value: message sequence num 3
[main] INFO ConsumerDemo - Partition: 0| Offset: 1
[main] INFO ConsumerDemo - Key: id_6| Value: message sequence num 6
[main] INFO ConsumerDemo - Partition: 0| Offset: 2
This matches exactly what we learned from the theory course:
- The consumer joins the group
- Since it’s the only consumer and there are three partitions in this topic, all three partitions are assigned to it
- The consumer resets the offset to 0 for all partitions because
auto.offset.reset
is set toearliest
- The consumer starts with offset 0, reads messages, and commits the offset
If we run the consumer program again, we’ll see:
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[15.223.178.125:28201 (id: 8 rack: cac1-az4)], epoch=0}}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-1 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[35.183.47.125:28201 (id: 9 rack: cac1-az1)], epoch=0}}
[main] INFO - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Setting offset for partition demo_java-2 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[3.99.146.13:28201 (id: 7 rack: cac1-az2)], epoch=0}}
The above log shows that the consumer group has caught up with all messages. For example, it has read all messages in partition 1 up to offset 8.
Partition Rebalance
If we run multiple consumer instances, we should be able to see that partition assignments are updated.
Here, new partitions are assigned to this consumer:
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Notifying assignor about the new Assignment(partitions=[demo_java-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-java-application-1, groupId=my-java-application] Adding newly assigned partitions: demo_java-0
Here are the rebalancing strategies:
Eager Rebalance
All partition assignments will be revoked, and random reassignment will be made.
Cooperative Rebalance
Other consumers that don’t have reassigned partitions can still process messages without interruption. This approach is much better.
Usage
The first three are eager rebalance strategies.
When the consumer runs, the log shows the strategy:
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
All we need to do is change the property setting:
properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
This produces the following log:
Assigned partitions: [demo_java-0, demo_java-1, demo_java-2]
Current owned partitions: []
Added partitions (assigned - owned): [demo_java-0, demo_java-1, demo_java-2]
Revoked partitions (owned - assigned): []
When a new consumer joins:
Assigned partitions: []
Current owned partitions: []
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): []
But then it becomes:
Assigned partitions: [demo_java-2]
Current owned partitions: []
Added partitions (assigned - owned): [demo_java-2]
Revoked partitions (owned - assigned): []
The first consumer now shows:
Assigned partitions: [demo_java-0, demo_java-1]
Current owned partitions: [demo_java-0, demo_java-1, demo_java-2]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [demo_java-2]
Offset Commit Behavior
You can control how often offset messages are committed:
auto.commit.interval.ms=5000
enable.auto.commit=true
With these settings, offset messages will be auto-committed every 5 seconds.