Apache Kafka Producer @Bean Configuration

In this tutorial, you will learn how to use the @Bean configuration for the Kafka producer in a Spring Boot application.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.

Introduction

A Kafka producer is a client that sends messages to a Kafka topic, which is a logical name for a stream of records. A Kafka producer can specify various configuration properties to control how the messages are sent, such as the bootstrap servers, the key and value serializers, the acknowledgment mode, the delivery timeout, the linger time, and the request timeout.

By default, Spring Boot provides a convenient way to configure the Kafka producer properties in the application.properties or application.yml file. However, this approach has some limitations, such as:

  • You cannot use different properties for different producers in the same application,
  • You cannot use custom properties that are not supported by Spring Boot.

To overcome these limitations, you can use the @Bean annotation to create and configure the Kafka producer beans programmatically in a Spring Boot application. The @Bean annotation tells Spring that a method returns an object that should be registered as a bean in the Spring application context. The benefits of using the @Bean configuration over the application.properties file are:

  • You can change the properties at runtime using the @RefreshScope annotation, which refreshes the bean when a configuration change is detected,
  • You can create multiple producers with different properties in the same application,
  • You can use any properties that are supported by the Kafka producer API.

In this tutorial, you will learn how configure Kafka Producer using in a method annotated with @Bean annotation.

Step 1. Create a Kafka Producer Configuration Class

To configure Kafka Producer in my Java code, I will create a new configuration class. This configuration class will contain Kafka Producer configuration in a @Bean method.

For example, you can create a class named KafkaProducerConfig.java as follows:

@Configuration
@RefreshScope
public class KafkaProducerConfig {
  // ...
}

Step 2. Inject the Kafka properties from application.properties file

In this section, you will learn how to inject configuration properties from application.properties file into KafkaProducerConfig class.

Inject properties using @Value annotation

You can still use the application.properties file to store the Kafka properties, but instead of letting Spring Boot auto-configure them, you can inject them into the Kafka producer configuration class using the @Value annotation.

The @Value annotation allows you to inject a single property value using the ${…} placeholder. For example, you can inject the following properties from the application.properties file into the Kafka producer configuration class using the @Value annotation.

application.properties file

spring.kafka.producer.bootstrap-servers=localhost:9092,localhost:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.acks=all
spring.kafka.producer.properties.delivery.timeout.ms=120000
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.properties.request.timeout.ms=30000

KafkaProducerConfig.java 

@Configuration
@RefreshScope
public class KafkaProducerConfig {

  @Value("${spring.kafka.producer.bootstrap-servers}")
  private String bootstrapServers;

  @Value("${spring.kafka.producer.key-serializer}")
  private String keySerializer;

  @Value("${spring.kafka.producer.value-serializer}")
  private String valueSerializer;

  @Value("${spring.kafka.producer.acks}")
  private String acks;

  @Value("${spring.kafka.producer.properties.delivery.timeout.ms}")
  private Integer deliveryTimeout;

  @Value("${spring.kafka.producer.properties.linger.ms}")
  private Integer linger;

  @Value("${spring.kafka.producer.properties.request.timeout.ms}")
  private Integer requestTimeout;

  // ...
}

Inject Properties using the @ConfigurationProperties annotation

Alternatively(instead of using @Value annotation), you can use the @ConfigurationProperties annotation to bind all the properties with the prefix spring.kafka.producer to a bean named KafkaProducerProperties.java as follows:

@ConfigurationProperties(prefix = "spring.kafka.producer")
public class KafkaProducerProperties {

  private String bootstrapServers;
  private String keySerializer;
  private String valueSerializer;
  private String acks;
  private Map<String, Object> properties;

  // getters and setters
}

Then, you can inject the KafkaProducerProperties bean into the Kafka producer configuration class using the @EnableConfigurationProperties and @Autowired annotations as follows:

@Configuration
@EnableConfigurationProperties(KafkaProducerProperties.class)
@RefreshScope
public class KafkaProducerConfig {

  @Autowired
  private KafkaProducerProperties kafkaProducerProperties;

  // ...
}

Step 3. Create a producerConfigs() method that returns a Map of producer configuration properties

Next, you can create a producerConfigs() method that returns a Map of producer configuration properties using the injected values from the previous step. The Map should contain the keys and values that are supported by the Kafka producer API.

For example, you can create the producerConfigs() method as follows:

@Configuration
@RefreshScope
public class KafkaProducerConfig {

  @Value("${spring.kafka.producer.bootstrap-servers}")
  private String bootstrapServers;

  @Value("${spring.kafka.producer.key-serializer}")
  private String keySerializer;

  @Value("${spring.kafka.producer.value-serializer}")
  private String valueSerializer;

  @Value("${spring.kafka.producer.acks}")
  private String acks;

  @Value("${spring.kafka.producer.properties.delivery.timeout.ms}")
  private Integer deliveryTimeout;

  @Value("${spring.kafka.producer.properties.linger.ms}")
  private Integer linger;

  @Value("${spring.kafka.producer.properties.request.timeout.ms}")
  private Integer requestTimeout;

  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
    return props;
  }

  // ...
}

Step 4. Create a producerFactory() method

The next step is to create a producerFactory() method that returns a ProducerFactory object using the producerConfigs() method.

The ProducerFactory is an interface that defines how to create a Kafka producer. Spring Boot provides a default implementation called DefaultKafkaProducerFactory, which takes a Map of producer configuration properties as a constructor argument.

For example, you can create the producerFactory() method as follows:

@Configuration
@RefreshScope
public class KafkaProducerConfig {

  // ...

  @Bean
  public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  // ...
}

Notice that the producerFactory() method is annotated with @Bean annotation. This is very important because with a @Bean annotation, you can make the ProducerFactory object available for other components that need to use it, such as the KafkaTemplate. Without the @Bean annotation, the producerFactory() method would not create a bean and the ProducerFactory object would not be accessible by other beans.

Step 5. Create a kafkaTemplate() method that returns a KafkaTemplate

Finally, you can create a kafkaTemplate() method that returns a KafkaTemplate object using the producerFactory() method. The KafkaTemplate is a class that provides a high-level abstraction for sending messages to Kafka topics. It wraps the Kafka producer and handles the serialization, partitioning, and error handling.

For example, you can create the kafkaTemplate() method as follows:

@Configuration
@RefreshScope
public class KafkaProducerConfig {

  @Bean
  KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  @Bean
  ProducerFactory<String, Object> producerFactory() { 
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  // ...
}

Conclusion

I hope this tutorial is helpful to you.

To learn more about Apache Kafka, please check my other Apache Kafka tutorials for beginners. You can also find more resources and documentation on the official Kafka website.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.