How to consume Kafka messages from Java

Ondrej Kvasnovsky
2 min readJan 22, 2023

--

In Apache Kafka, we can consume messages from a topic using the Kafka Consumer API in Java. The Kafka Consumer API provides a simple and easy-to-use API for consuming messages from Kafka topics.

Setup

Follow this tutorial to setup Kafka, create topics and publish messages.

Dependencies

First, include the following dependencies in your Gradle project.

implementation 'org.apache.kafka:kafka-streams:3.3.1'
implementation 'org.apache.kafka:kafka-clients:3.3.1'

Poll for new records

The Kafka consumer poll() method fetches records in sequential order from a specified topic and partitions.

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}

Seek to

We can also use the seek method to set the offset for a specific partition and then call poll to fetch records starting from that offset.

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.assign(Collections.singletonList(partition));
consumer.seek(partition, 11);

while (true) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}

--

--

No responses yet