Creating Kafka Consumer in Spring Boot Microservice

In this tutorial, I will guide you through setting up a Kafka Consumer in a Spring Boot application. You will learn how to configure the consumer properties, create a listener class using the @KafkaListener annotation, and how to process messages with the @KafkaHandler annotation.

Let’s start with the configuration.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.

Kafka Consumer Configuration in application.properties

In a Spring Boot application, the application.properties file is where you define settings for your application components. For Kafka Consumer, you will need to set properties that define the connection to the Kafka broker, the group ID, and other consumer settings like concurrency levels. Here is an example of what this configuration might look like:

server.port=0
spring.kafka.consumer.bootstrap-servers=localhost:9092,localhost:9094
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
consumer.group-id=product-created-events
spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core

Below I will explain each of this configuration property in details.

server.port

In your Spring Boot application’s application.properties, the server port is a basic configuration that decides where your application runs. Without setting it, Spring Boot defaults to port 8080. If this port is busy, your application won’t start. To avoid this, you can set the port to 0, which tells Spring Boot to use any available port. This is useful when running multiple instances of the same service:

server.port=0

spring.kafka.consumer.bootstrap-servers

Now, let’s set up the Kafka consumer configurations. You’ll need to connect to Kafka, and that’s where bootstrap-servers come in. This property lists the Kafka broker addresses that your consumer will connect to:

spring.kafka.consumer.bootstrap-servers=localhost:9092,localhost:9094

Here, localhost:9092 and localhost:9094 are the addresses of the Kafka brokers. Including more than one address helps your consumer stay connected if one broker is down.

key-deserializer and value-deserializer

Next are the key-deserializer and value-deserializer properties. Kafka messages have keys and values, which are sent as byte arrays. The deserializers convert these byte arrays back into objects:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

StringDeserializer is for the message keys, which are strings. JsonDeserializer is for the message values, which will be in JSON format.

consumer.group-id

When you set up a Kafka consumer, you can make multiple instances of a Microservice work as a team to handle messages from a Kafka topic. This team is what Kafka calls a consumer group. Each consumer in the group reads messages from the topic, ensuring that, together, they process messages faster.

The consumer.group-id property is how you name your team. All consumers with the same group-id are in the same group. Here’s how you might set it in your application.properties file:

consumer.group-id=product-created-events

In this case, product-created-events is the name of your consumer group.

If you have multiple instances of your application, and you want them all to share the workload of processing messages, you give them all the same group-id. Kafka does the rest, distributing the messages among the consumers in the group.

If every consumer had a different group-id, they wouldn’t be working as a team. Instead, each one would get all the messages, leading to duplicate processing.

spring.kafka.consumer.properties.spring.json.trusted.packages

This configuration property is quite important for security when you’re deserializing JSON messages in your Kafka consumer. It specifies which Java packages are ‘trusted’ for JSON deserialization by your application.

Here’s why it’s needed: When converting JSON to Java objects, there’s a risk. If an attacker sends a JSON that matches a class in your application, they might be able to execute unwanted code. To prevent this, Spring allows you to specify exactly which packages can be used for deserialization.

spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core

This means that only the classes in the com.appsdeveloperblog.ws.core package can be deserialized.

If you were to set it to *, like this:

spring.kafka.consumer.properties.spring.json.trusted.packages=*

This would mean that your application trusts all packages, which is not safe. An attacker could craft a JSON payload that could exploit your application by deserializing to a malicious class.

You can also specify multiple packages by separating them with commas. For instance:

spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core,com.appsdeveloperblog.ws.models

This way, both com.appsdeveloperblog.ws.core and com.appsdeveloperblog.ws.models packages are trusted for JSON deserialization.

For beginners, it’s like telling your application, “Only accept JSON that looks like the classes in the packages I trust. Ignore anything else.” This keeps your application secure.

Kafka Consumer Class with @KafkaListener

For your Microservice to be able to read messages from Kafka topic, you will need to create a new class and annotate it with @KafkaListener annotation.

The @KafkaListener annotation marks a method or a class to be the target of a Kafka message listener on the specified topics. This is how you tell Spring where to send messages from a topic. For example:

@Component
@KafkaListener(topics="product-created-events-topic")
public class ProductCreatedEventHandler {

    @KafkaHandler
    public void handle(ProductCreatedEvent productCreatedEvent) {
        // processing logic goes here
    }
}

In this class:

  • @Component makes it a Spring-managed bean.
  • @KafkaListener(topics="product-created-events-topic") specifies that this bean should listen to the “product-created-events-topic” topic in Kafka.
  • The handle method is where you will process each ProductCreatedEvent message that comes in.

Consuming Messages with @KafkaHandler

The @KafkaHandler annotation designates the method within a class annotated with @KafkaListener that will handle messages. When a message arrives, Spring will invoke the method annotated with @KafkaHandler that has a compatible parameter type.

For instance, in the ProductCreatedEventHandler class, the handle method is annotated with @KafkaHandler and takes a ProductCreatedEvent as a parameter:

@KafkaHandler
public void handle(ProductCreatedEvent productCreatedEvent) {
    // Logic to process the event
}

When a ProductCreatedEvent message is received from the “product-created-events-topic”, this handle method will be called with the message payload. The event class is below.

Event Class – ProductCreatedEvent

The ProductCreatedEvent class is designed to match the structure of the JSON message that the Kafka consumer will receive.

package com.appsdeveloperblog.ws.core;

import java.math.BigDecimal;

public class ProductCreatedEvent {
    
    private String productId;
    private String title;
    private BigDecimal price;
    private Integer quantity;
    
    public ProductCreatedEvent() {
        
    }
    
    public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
        this.productId = productId;
        this.title = title;
        this.price = price;
        this.quantity = quantity;
    }

    // Standard getters, and setters omitted for brevity
    
}

When a message arrives from Kafka, Spring uses the JsonDeserializer specified in your application.properties to turn that JSON into a ProductCreatedEvent object. It looks at the JSON keys (title, price, quantity) and maps them to the corresponding fields in the class.

For the deserialization to work, the JSON message must have keys that exactly match the field names in the ProductCreatedEvent class. If the message is in the right format, like the example provided:

{
    "title": "iPad Pro",
    "price": 1250,
    "quantity": 19
}

Then the @KafkaHandler annotated method will receive an instance of ProductCreatedEvent with its title set to "iPad Pro", its price set to 1250, and quantity set to 19. You can then use this object within the method to process the message as needed.

Conclusion

I hope this tutorial was helpful to you.

To learn more about how Apache Kafka can be used to build Event-Driven Spring Boot Microservices, please check my Kafka tutorials for beginners page.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.