Kafka Producer in Spring Boot Microservice

In this tutorial, you will learn how to implement Kafka Producer in a Spring Boot Microservice.

A Kafka Producer is an application that sends messages to a Kafka topic, which can be consumed by other microservices. You will use Spring for Apache Kafka dependency to configure and create a Kafka Producer in your Spring Boot application.

To learn more about Apache Kafka for Spring Boot Microservices, check out the Apache Kafka tutorials page.

To learn how to work with Kafka Producer using Kafka CLI, read “Kafka CLI – Send/Produce message to a topic” tutorial.

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.

Step 1: Adding Spring for Apache Kafka Dependency

In this step, you will learn how to add the Spring for Apache Kafka dependency to your Spring Boot Microservice. If you need help creating a new Spring Spring Boot Microservice, then read “Creating a simple Spring Boot project” tutorial.

The Spring for Apache Kafka dependency provides the core functionality for integrating Kafka with Spring, such as sending and receiving messages, configuring topics, and creating producers and consumers.

To add the Spring for Apache Kafka dependency, you need to edit pom.xml file in your Spring Boot Microservice.

The Spring for Apache Kafka dependency has the following coordinates:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.1</version>
</dependency>

The coordinates consist of three parts: the groupId, the artifactId, and the version.

  • groupId –  identifies the organization or group that created the dependency.
  • artifactId – identifies the name of the dependency.
  • version – specifies the release number of the dependency. In Spring Boot application you will most likely want to remove version element, so that your application uses version number which is supported by the Spring Boot version you use. So the version number is optional.

To add the dependency to your project, you need to copy and paste the coordinates inside the <dependencies> tag in your pom.xml file. The <dependencies> tag contains all the dependencies that your project needs. Here is an example of how your pom.xml file might look like after adding the dependency:

<project ...>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        ...
    </dependencies>
    ...
</project>

After saving the pom.xml file, Maven will automatically download the dependency and its transitive dependencies, which are the dependencies that the dependency itself depends on. You can see the list of dependencies in the Maven Dependencies folder in your project explorer.

Step 2: Kafka Producer Configuration Properties

In this section, you will learn how to configure a Kafka producer using the application.properties file in a Spring Boot microservice.

Firstly, let’s open the application.properties file. The initial configuration property we’ll add isn’t directly related to Kafka. It’s to set the port number for your Spring Boot application.

Configuring the Port Number

By default, a Spring Boot application runs on port 8080. If you don’t specify a port number, that’s where it will launch. However, if you attempt to run another instance of the microservice, it will fail because port 8080 is already in use. To avoid this, I’ll show you a neat trick:

Add the following line to your application.properties:

server.port=0

Setting the port number to zero lets your Spring Boot application start on a random port. This ensures that if you launch multiple instances of the same microservice on the same machine, their port numbers won’t conflict.

Bootstrap Servers

Now, let’s add our first Kafka-related property – the bootstrap-servers:

spring.kafka.producer.bootstrap-servers=localhost:9092

This property specifies the initial connection point for your Kafka producer to connect to the Kafka cluster. For a local development environment, localhost:9092 is usually sufficient. However, if your cluster has multiple brokers, it’s better to list at least two bootstrap servers. This way, if one server is down, the other can be used for the initial connection.

If you have an additional server, just append it after a comma:

spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9094

Key Serializer Configuration

The next property is the key-serializer:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka messages are sent as key-value pairs. The key determines the topic partition where the message will be stored. The key-serializer property specifies how to turn these keys into a byte array for transmission. In our case, we’re using Kafka’s StringSerializer for string keys.

Different data types require different serializers. To explore other serializers, you can look under the org.apache.kafka.common.serialization package in Java.

Value Serializer Configuration

Lastly, let’s configure the serializer for the message values:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

In this tutorial series, I will be publishing or producing events in a JSON format. Because I want to use a JSON format, as a value-serializer: I chose JsonSerializer.

Notice that this time the serializer class comes from the Spring Framework package rather than from Apache Kafka. This is because, at the time of working on this tutorial, there is no JsonSerializer class that is provided by Apache Kafka. The JsonSerializer I am using here was implemented by the Spring Team, specifically to support Apache Kafka. So it works very well.

And this is it. This is the minimum set of configuration properties that you need to provide to configure the Kafka producer in the Spring Boot Microservice. Of course, there are more configuration properties that are nice-to-have, and they are optional. The above configuration properties are the minimum set you need to make your Kafka Producer working.

Step 3: Creating Event Class

Before publishing an event to a Kafka topic, I will need to create a Java class for that event. This class will define the structure and content of the event object that will be sent to Kafka topic.

Let’s assume that our Kafka producer needs to publish an event, when receives HTTP request from a client application. Let’s assume that the client application sends HTTP to create a new product the body of this HTTP request contains JSON payload with product details. In this section, you will create a new class called ProductCreatedEvent. This class will represent an event that occurs when a new product is created in your Microservice.

Below is a source code for the ProductCreatedEvent class. Later in this tutorial, I will create a new instance of this class and will give it to my Kafka producer. Kafka producer will serialize an object of this class into a byte array and will send it to Kafka topic.

package com.appsdeveloperblog.estore.ws.core.events;

import java.math.BigDecimal;

public class ProductCreatedEvent {
    private String productId;
    private String title;
    private BigDecimal price;
    private Integer quantity;
 

    public ProductCreatedEvent() {

    }
    
    public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
        this.productId = productId;
        this.title = title;
        this.price = price;
        this.quantity = quantity;
    }

    public String getProductId() {
        return productId;
    }

    public String getTitle() {
        return title;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }

}

Step 4: Publish an Event to Kafka Topic

In this section, you will learn how to publish the ProductCreatedEvent to a Kafka topic synchronously and asynchronously.

Send Kafka Message Asynchronously

Let’s start with asynchronous approach first. In this section, you will learn how to publish event to Kafka topic asynchronously.

I will use the ProductServiceImpl class to encapsulate the business logic for creating and publishing a product.

package com.appsdeveloperblog.estore.ws.core.events;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class ProductServiceImpl {

    private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;

    @Autowired
    public ProductServiceImpl(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public String createProduct(CreateProductRestModel productRestModel) {

        // Placeholder for the LOGGER
        private static final Logger LOGGER = LoggerFactory.getLogger(ProductServiceImpl.class);

        // Generate a unique product ID
        String productId = UUID.randomUUID().toString();

        // TODO: Persist Product Details into a database before publishing an Event.
        
        // Create the ProductCreatedEvent
        ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(
                productId,
                productRestModel.getTitle(),
                productRestModel.getPrice(),
                productRestModel.getQuantity()
        );

        // Send the event asynchronously
        CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
                kafkaTemplate.send("product-created-events-topic-name", productId, productCreatedEvent);

        // Handle the future result
        future.whenComplete((result, exception) -> {
            if (exception != null) {
                LOGGER.error("Failed to send message: " + exception.getMessage());
            } else {
                LOGGER.info("Message sent successfully: " + result.getRecordMetadata().toString());
            }
        });

        // Optionally, wait for the result synchronously
        // future.join();

        return productId;
    }

}

To send messages to a Kafka topic, I use a very special object called KafkaTemplate. Think of KafkaTemplate as a helper that simplifies sending messages to Kafka topics. It’s a part of the Spring Kafka project and comes with many features that make it work seamlessly with Spring applications.

Here’s how the KafkaTemplate is set up in your code:

private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;

The KafkaTemplate is generic, which means it can be configured to work with different types of keys and values. In this case, I’ve specified String for the key and ProductCreatedEvent for the value. This means that your Kafka messages will have a String key and a ProductCreatedEvent as the value.

Now, let’s discuss the send() method:

kafkaTemplate.send("product-created-events-topic-name", productId, productCreatedEvent);

This send() method is what actually publishes your event to the Kafka topic. It takes three parameters:

  1. The name of the Kafka topic — “product-created-events-topic-name”. In your case, the topic name can be different. To learn how to create a new topic, read “Creating Kafka Topics” tutorial.
  2. The key of the message — productId, which is a unique string that identifies each product.
  3. The actual message — productCreatedEvent, which contains all the details about the product you wish to send.

The reason this code is asynchronous is that the send() method doesn’t wait for the message to be delivered before it allows your program to continue running. Instead, it sends the message to the topic and immediately returns control to your program. This is very useful because it means your application isn’t held up by the message-sending process, which might take some time, especially if the Kafka cluster is busy or has some issues.

To handle the outcome of this asynchronous operation, you can use the returned CompletableFuture. This is a special type of object in Java that can hold the result of an asynchronous computation. You can add a callback to the CompletableFuture that will be called when the message is successfully sent or if there is an error:

future.whenComplete((result, exception) -> {
    if (exception != null) {
        LOGGER.error("Failed to send message: " + exception.getMessage());
    } else {
        LOGGER.info("Message sent successfully: " + result.getRecordMetadata().toString());
    }
});

In this code snippet, whenComplete is attached to the CompletableFuture. It accepts two parameters:

  • result, which holds information about the sent message if it was successful, and
  • exception, which holds the error if something went wrong.

Inside this callback, I check if exception is not null, which indicates there was an error during the send operation. If it’s null, the message was sent successfully, and you can log the success or failure accordingly.

Send Kafka Message Synchronously

Now let’s see how we can publish event synchronously.

To send a Kafka message synchronously, it means you’re making the code wait until the message has been delivered before continuing with the next line of code. Here’s how you achieve that:

@Override
public String createProduct(CreateProductRestModel productRestModel) throws Exception {
    
    String productId = UUID.randomUUID().toString();
    
    // TODO: Persist Product Details into database table before publishing an Event
    
    ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(productId,
            productRestModel.getTitle(), productRestModel.getPrice(), 
            productRestModel.getQuantity());
    
    LOGGER.info("Before publishing a ProductCreatedEvent");
    
    SendResult<String, ProductCreatedEvent> result = 
            kafkaTemplate.send("product-created-events-topic",productId, productCreatedEvent).get();
    
    LOGGER.info("Partition: " + result.getRecordMetadata().partition());
    LOGGER.info("Topic: " + result.getRecordMetadata().topic());
    LOGGER.info("Offset: " + result.getRecordMetadata().offset());
    
    LOGGER.info("***** Returning product id");
    
    return productId;
}

In the provided method, createProduct(), after creating the productCreatedEvent, I am using the kafkaTemplate to send the message:

The key part here that makes the operation synchronous is the .get() at the end of the send() method.

Normally, send() is asynchronous and returns a ListenableFuture object, which you can use to register a callback to execute code once the future is complete. However, when you call .get() on this ListenableFuture, you’re telling your code to “wait here” until the result of the future is available.

The .get() method is a blocking call. This means that your application will stop at this line and will not move to the next line until Kafka has responded with the result of the send operation. This could be a confirmation that the message was successfully sent, or an exception if something went wrong.

So, to summarize, to send a message synchronously in Kafka using KafkaTemplate in Spring Boot, you simply append .get() to the send() method. This will block your application’s flow until the send operation is complete, providing a simple way to handle the result immediately within the same thread of execution.

Step 5: Reading Result Metadata

After publishing an event to a Kafka topic synchronously, you receive a SendResult object. This object contains valuable information about the operation, known as metadata. Let’s understand what this metadata represents and how you can read it.

Understanding the Metadata

In Kafka, when a message is published to a topic, it’s not just thrown into a digital void. It’s methodically stored in a partition within a topic at a specific position known as an offset. The SendResult object’s getRecordMetadata() method provides this exact information.

Reading Partition Information

LOGGER.info("Partition: " + result.getRecordMetadata().partition());

The partition is like a sub-container within a topic. Topics in Kafka are split into partitions to allow for scaling (more partitions mean more potential for parallel processing) and fault tolerance. When you log result.getRecordMetadata().partition(), you’re getting the partition number where your message was stored. It’s useful for understanding how Kafka is distributing your messages across the topic.

Reading the Topic name

LOGGER.info("Topic: " + result.getRecordMetadata().topic());

This line confirms the topic to which the message was sent. It’s a straightforward piece of data but serves as a good check to ensure your message is going to the right place.

Locating the Offset

LOGGER.info("Offset: " + result.getRecordMetadata().offset());

The offset is a unique identifier for each message within a partition. You can think of it as the address of your message in Kafka’s storage. Logging the offset helps you track exactly where the message lands in the partition. It can be particularly useful for debugging purposes or for setting up systems that might need to read from a specific point in the topic.

Conclusion

If you are interested in video lessons then check my video course Apache Kafka for Event-Driven Spring Boot Microservices.

I hope you enjoyed this tutorial and found it useful.

If you want to learn more about Apache Kafka and how to use it in your microservices, you can check out the Apache Kafka tutorials page on this website. There you will find more tutorials and articles on various Kafka topics, such as creating Kafka consumers, using Kafka streams, configuring Kafka security, and more.