Configuring Kafka and Spring Boot

Let’s see in this post how to implement and configure Apache Kafka with SpringBoot.

Spring provides a simple way to integrate with Kafka by providing a template to easily send messages and annotations to receive messages. in a simple way and annotations to receive the messages. We will use a docker image with Kafka to test its execution.

Kafka gives us the infrastructure to send messages using producers and consumers that receive them. The messages being produced and consumed are sent through topics. A topic is the category of the message being sent.

Kafka

Dependencies for Kafka in Spring Boot

We will need the following dependencies for our project

'org.springframework.boot:spring-boot-starter-web'
'org.springframework.kafka:spring-kafka'
'org.springframework.boot:spring-boot-starter-test'
'org.springframework.kafka:spring-kafka-test'

Spring Boot project with Kafka

We will create our Spring Boot project

@SpringBootApplication
public class DemoKafkaApplication {

   public static void main(String[] args) {
      SpringApplication.run(DemoKafkaApplication.class, args);
   }

}

Defining a producer to send messages to a Kafka topic

We will define a service that will be in charge of producing a message to a Kafka topic.

As we can see below Spring provides us with a KafkaTemplate that we can use in our services or components.

We just have to send the message using kafkaTemplate.send(..)

kafkaTemplate.send("topic", "message")
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        LOGGER.info("Producing message {}", message);
        this.kafkaTemplate.send("TOPIC-DEMO", message);
    }
}

Defining a consumer to receive Kafka’s topic messages

In this example we will also define a consumer to receive Kafka’s message.

This would usually be in another microservice, but we will do it here in order to simplify the example.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "TOPIC-DEMO")
    public void consume(String message) {
        logger.info("Consuming Message {}", message);
    }
}

We will create a CommandLineRunner to perform our test.

Just to visualize how our demo application works we will create a CommandLineRunner that will send messages every two seconds in a loop.

import com.gp.demokafka.producer.KafkaProducer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {

    private final KafkaProducer producer;

    public CommandLineAppStartupRunner(KafkaProducer producer) {
        this.producer = producer;
    }

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < 10; i++) {
            producer.sendMessage("Hello kafka !! " + i);
            Thread.sleep(2000);
        }
    }
}

Create a docker compose with Kafka

For our project we need Kafka in a docker container.

We will use a Kafka image from this Docker Hub route

Docker Hub

version: '3'

services:
  zookeeper:
    container_name: 
      zookeeper_merge
    image: 
      wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka_merge_net
  kafka:
    container_name: 
      kafka_merge
    image:
      wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ZK_SERVER: zookeeper:2181
      ADVERTISED_HOST: localhost
      CONSUMER_THREADS: 5
      BROKER_ID: 1
      ADVERTISED_PORT: 9092

    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka_merge_net
 
networks:
  kafka_merge_net:
    driver: bridge

We start Kafka from the docker image

We must save the file docker-compose.yml and then run in the same location

docker-compose up

docker compose up

Once started our docker with Kafka we can test our service. We see the execution of the loop that we previously defined every two seconds.

We start our project in Spring Boot

For this example I use gradle, so I start it gradlew bootRun.

We see in the console the output of our project

spring boot

Source code of this example

As always I leave the source code of this example for you to have it at hand.

https://github.com/gustavopeiretti/spring-boot-examples/tree/master/spring-boot-kafka

Hi! If you find my posts helpful, please support me by inviting me for a coffee :)

See also