Let’s see in this post how to implement and configure Apache Kafka with SpringBoot.
Spring provides a simple way to integrate with Kafka by providing a template to easily send messages and annotations to receive messages. in a simple way and annotations to receive the messages. We will use a docker image with Kafka to test its execution.
Kafka gives us the infrastructure to send messages using producers and consumers that receive them. The messages being produced and consumed are sent through topics. A topic is the category of the message being sent.
Dependencies for Kafka in Spring Boot
We will need the following dependencies for our project
'org.springframework.boot:spring-boot-starter-web'
'org.springframework.kafka:spring-kafka'
'org.springframework.boot:spring-boot-starter-test'
'org.springframework.kafka:spring-kafka-test'
Spring Boot project with Kafka
We will create our Spring Boot project
@SpringBootApplication
public class DemoKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaApplication.class, args);
}
}
Defining a producer to send messages to a Kafka topic
We will define a service that will be in charge of producing a message to a Kafka topic.
As we can see below Spring provides us with a KafkaTemplate that we can use in our services or components.
We just have to send the message using kafkaTemplate.send(..)
kafkaTemplate.send("topic", "message")
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
LOGGER.info("Producing message {}", message);
this.kafkaTemplate.send("TOPIC-DEMO", message);
}
}
Defining a consumer to receive Kafka’s topic messages
In this example we will also define a consumer to receive Kafka’s message.
This would usually be in another microservice, but we will do it here in order to simplify the example.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "TOPIC-DEMO")
public void consume(String message) {
logger.info("Consuming Message {}", message);
}
}
We will create a CommandLineRunner to perform our test.
Just to visualize how our demo application works we will create a CommandLineRunner that will send messages every two seconds in a loop.
import com.gp.demokafka.producer.KafkaProducer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {
private final KafkaProducer producer;
public CommandLineAppStartupRunner(KafkaProducer producer) {
this.producer = producer;
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
producer.sendMessage("Hello kafka !! " + i);
Thread.sleep(2000);
}
}
}
Create a docker compose with Kafka
For our project we need Kafka in a docker container.
We will use a Kafka image from this Docker Hub route
version: '3'
services:
zookeeper:
container_name:
zookeeper_merge
image:
wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- kafka_merge_net
kafka:
container_name:
kafka_merge
image:
wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZK_SERVER: zookeeper:2181
ADVERTISED_HOST: localhost
CONSUMER_THREADS: 5
BROKER_ID: 1
ADVERTISED_PORT: 9092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka_merge_net
networks:
kafka_merge_net:
driver: bridge
We start Kafka from the docker image
We must save the file docker-compose.yml and then run in the same location
docker-compose up
Once started our docker with Kafka we can test our service. We see the execution of the loop that we previously defined every two seconds.
We start our project in Spring Boot
For this example I use gradle, so I start it gradlew bootRun.
We see in the console the output of our project
Source code of this example
As always I leave the source code of this example for you to have it at hand.
https://github.com/gustavopeiretti/spring-boot-examples/tree/master/spring-boot-kafka