How to consume Kafka messages from Java
2 min readJan 22, 2023
In Apache Kafka, we can consume messages from a topic using the Kafka Consumer API in Java. The Kafka Consumer API provides a simple and easy-to-use API for consuming messages from Kafka topics.
Setup
Follow this tutorial to setup Kafka, create topics and publish messages.
Dependencies
First, include the following dependencies in your Gradle project.
implementation 'org.apache.kafka:kafka-streams:3.3.1'
implementation 'org.apache.kafka:kafka-clients:3.3.1'
Poll for new records
The Kafka consumer poll()
method fetches records in sequential order from a specified topic and partitions.
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MessageConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
Seek to
We can also use the seek
method to set the offset for a specific partition and then call poll
to fetch records starting from that offset.
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MessageConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.assign(Collections.singletonList(partition));
consumer.seek(partition, 11);
while (true) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}