Kafka Consumer Configuration in the @Bean Method

In this tutorial, I will guide you through the process of setting up a Kafka Consumer using the @Configuration class and the @Bean method. This approach offers more flexibility and control compared to using the application.properties file.

Kafka consumer configuration can be achieved in two primary ways in a Spring Boot application:

  1. Using the application.properties file: This is a straightforward method where you define all your Kafka Consumer configurations in the application.properties or application.yml file of your Spring Boot project. If you’re interested in this method, I recommend reading the tutorial “Creating Kafka Consumer in a Spring Boot Microservice” for a comprehensive guide.
  2. Using the @Configuration class and @Bean method: This method, which is our focus in this tutorial, involves creating a dedicated configuration class annotated with @Configuration. Inside this class, you define methods annotated with @Bean to set up your Kafka Consumer. This method provides a programmatic way to define your configurations, giving you more control and flexibility, especially in complex applications.

If you are new to Kafka, you might wonder why choose one method over the other. The answer lies in your project’s requirements. If you need a quick and simple setup, the application.properties method is suitable. However, if your project demands more dynamic and complex configurations, using @Configuration and @Bean annotations is the way to go.

In the following sections, I will walk you through creating your @Configuration class and configuring your Kafka Consumer using the @Bean method.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.

Creating the @Configuration Class

In Spring Boot, an @Configuration class is a special type of class that holds bean definitions. Beans are objects that form the backbone of your application and are managed by the Spring container. When you annotate a class with @Configuration, you’re telling Spring Boot that this class contains methods that produce beans. These beans are then used by Spring Boot to perform various actions in your application.

The @Configuration class is where you’ll define your Kafka Consumer configurations.

package com.yourpackage;

import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConsumerConfig {
    // Beans will be defined here
}

Configuring Kafka Consumer with @Bean Method

Now that you have your @Configuration class ready, it’s time to configure the Kafka Consumer within it. This is where the @Bean annotation comes into play.

The @Bean annotation tells Spring Boot that a method produces a bean to be managed by the Spring container. When you annotate a method with @Bean, the return value of the method becomes a bean in the Spring application context, available for other parts of the application to use. Read more about @Bean annotation.

Inside your @Configuration class, define a method that returns a ConsumerFactory. This factory is responsible for creating Kafka Consumer instances. Annotate this method with @Bean.

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    // Configuration details will go here
}

Inside this method, you’ll specify various properties that configure the Kafka Consumer.

Bootstrap Servers

The bootstrap-servers property defines the Kafka server addresses. It’s crucial for the consumer to know where to connect for fetching data.

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094");

Key Deserializer

This property tells the consumer how to deserialize the key of a message. Kafka messages have a key and value, and you can specify different deserializers for each. For most cases, StringDeserializer is used for keys.

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

Value Deserializer

Similar to the key, this property specifies how to deserialize the message value. Since Kafka can handle various types of data, the deserializer depends on the type of data your messages will have. In our case, we are using JsonDeserializer for JSON formatted message values.

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Group ID

The group-id property defines the consumer group ID. This is important for Kafka to know which messages have been processed by which consumer.

config.put(ConsumerConfig.GROUP_ID_CONFIG, "product-created-events");

Trusted Packages

When using JsonDeserializer, you need to specify which packages are trusted for deserialization. This is a security measure to ensure only known classes are deserialized.

config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.appsdeveloperblog.ws.core");

When JSON is transformed into Java objects, there’s a potential security risk. An attacker could exploit this by crafting a JSON that, when deserialized, executes unwanted code. To mitigate this, you specify which packages are allowed for deserialization.

For example, setting spring.kafka.consumer.properties.spring.json.trusted.packages=com.appsdeveloperblog.ws.core means only classes in the com.appsdeveloperblog.ws.core package are deemed safe for deserialization. It’s a way to tell your application, “Only convert JSON into Java objects if they match the classes in these trusted packages.”

Setting this property to *, which implies trusting all packages, is risky and opens up your application to potential security vulnerabilities. It’s advisable to explicitly list the packages you trust, ensuring that your application remains secure against deserialization attacks.

Final version

Here’s the final version of the KafkaConsumerConfig class with the consumerFactory method included. This class is fully configured to create a Kafka Consumer in your Spring Boot application:

package com.appsdeveloperblog.ws.emailnotification;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        // Kafka Bootstrap Server configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094");

        // Key Deserializer
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Value Deserializer
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.appsdeveloperblog.ws.core");

        // Consumer Group ID
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "product-created-events");

        return new DefaultKafkaConsumerFactory<>(config);
    }
}

Conclusion

I hope this tutorial was helpful to you.

To learn more about how Apache Kafka can be used to build Event-Driven Spring Boot Microservices, please check my Kafka tutorials for beginners page.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.