Veamos en este post como implementar y configurar Apache Kafka con SpringBoot.
Spring provee una forma simple de integrarse con Kafka para enviar mensajes. Mediante anotaciones podemos recibir los mensajes de Kafka y a través del uso de un template enviamos fácilmente los mensajes.
Kafka nos da la infraestructura para enviar mensajes utilizando producers y consumers que los receptan. Los mensajes que se producen y se consumen se envían a través de topics. Un tópico es la categoría del mensaje que se está enviando.
Dependencias para Kafka en Spring Boot
Necesitaremos las siguientes dependencias para nuestro proyecto
'org.springframework.boot:spring-boot-starter-web'
'org.springframework.kafka:spring-kafka'
'org.springframework.boot:spring-boot-starter-test'
'org.springframework.kafka:spring-kafka-test'
Proyecto de Spring Boot con Kafka
Crearemos nuestro proyecto de Spring Boot igual que cualquier otro.
@SpringBootApplication
public class DemoKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaApplication.class, args);
}
}
Definiendo nuestro KafkaTemplate para enviar mensajes a un tópico de Kafka
Para enviar mensajes debemos definir KafkaTemplate.
Lo primero que debemos saber es qué tipo de mensajes queremos enviar.
Habitualmente el key es un String y el tipo de dato varía entre String u Objetos. Aquí, definimos mensajes de tipo String para el valor.
KafkaTemplate<String, String> kafkaTemplate;
@Configuration
public class KafkaStringConfig {
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean(name = "kafkaStringTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Definiendo un producer para enviar mensajes a un tópico de Kafka
Definiremos un productor que se encargara de producir un mensaje hacia un tópico de Kafka.
Como vimos antes en la configuración del productor, definimos un KafkaTemplate que podemos usar en nuestros servicios o componentes.
Solo tenemos que enviar el mensaje usando kafkaTemplate.send(..)
kafkaTemplate.send("topic", "message")
Mira que en este componente estamos usando el template que creamos en la config .
@Component
public class KafkaStringProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStringProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaStringProducer(@Qualifier("kafkaStringTemplate") KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
LOGGER.info("Producing message {}", message);
this.kafkaTemplate.send("TOPIC-DEMO", message);
}
}
Definiendo un consumer para recibir los mensajes de un tópico de Kafka
En este ejemplo crearemos también un consumer para recibir el mensaje de Kafka. Esto habitualmente estaría en otro microservicio, pero lo haremos aquí mismo a fin de simplificar el ejemplo.
En el consumer anotamos el método que queremos “escuche” el mensaje. Indicando el tópico que debe escuchar.
@KafkaListener(topics = "TOPIC-DEMO" , groupId = "group_id")
@Component
public class KafkaStringConsumer {
Logger logger = LoggerFactory.getLogger(KafkaStringConsumer.class);
@KafkaListener(topics = "TOPIC-DEMO" , groupId = "group_id")
public void consume(String message) {
logger.info("Consuming Message {}", message);
}
}
Crearemos un CommandLineRunner para realizar nuestra prueba
A efectos de visualizar el funcionamiento de nuestra aplicación crearemos un CommandLineRunner que enviará mensajes cada dos segundos en un loop.
@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {
private final KafkaStringProducer stringProducer;
public CommandLineAppStartupRunner(KafkaStringProducer stringProducer) {
this.stringProducer = stringProducer;
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
stringProducer.sendMessage("Hello kafka !! " + i);
Thread.sleep(2000);
}
}
}
Creamos un docker compose con Kafka
Para nuestro proyecto necesitamos Kafka funcionando. Asi que usaremos un contenedor docker con Kafka.
Usaremos una imagen de Kafka desde esta ruta de Docker Hub
version: '3'
services:
zookeeper:
container_name:
zookeeper_merge
image:
wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- kafka_merge_net
kafka:
container_name:
kafka_merge
image:
wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZK_SERVER: zookeeper:2181
ADVERTISED_HOST: localhost
CONSUMER_THREADS: 5
BROKER_ID: 1
ADVERTISED_PORT: 9092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka_merge_net
networks:
kafka_merge_net:
driver: bridge
Iniciamos Kafka desde la imagen de docker
Debemos guardar el file docker-compose.yml y luego ejecutar en la misma ubicación
docker-compose up
Una vez iniciado nuestro docker con Kafka podemos realizar la prueba de nuestro servicio. Vemos la ejecución del loop que definimos previamente cada dos segundos.
Arrancamos nuestro proyecto en Spring Boot
Para este ejemplo uso gradle, por lo que lo inicio gradlew bootRun
Vemos en la consola la salida de nuestro proyecto
Definiendo KafkaTemplate para enviar mensajes JSON
Dijimos antes que cuando definimos KafkaTemplate, indicamos el tipo de mensaje el el value .
KafkaTemplate<K, V>
Por lo que podemos también definir como tipo de mensaje una clase y que ese objeto viaje como un Json. En este ejemplo, queremos enviar un objeto de tipo User serializado en Json.
Aquí, en la config definimos JsonSerializer como Serializer del valor.
@Configuration
public class KafkaJsonConfig {
public ProducerFactory<String, User> producerFactoryJson() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean(name = "kafkaJsonTemplate")
public KafkaTemplate<String, User> kafkaTemplateJson() {
return new KafkaTemplate<>(producerFactoryJson());
}
}
Definiendo un producer y el consumer para mensajes Json
Observa que aquí usamos KafkaTemplate que definimos para que reciba una clase.
KafkaTemplate<String, User>
@Component
public class KafkaJsonProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJsonProducer.class);
private final KafkaTemplate<String, User> kafkaTemplate;
public KafkaJsonProducer(@Qualifier("kafkaJsonTemplate") KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(User user) {
LOGGER.info("Producing message {}", user);
this.kafkaTemplate.send("TOPIC-DEMO-JSON", user);
}
}
El consumer es igual que el primero que definimos. Solo que enviamos una clase User y a un tópico diferente.
@Component
public class KafkaJsonConsumer {
Logger logger = LoggerFactory.getLogger(KafkaJsonConsumer.class);
@KafkaListener(topics = "TOPIC-DEMO-JSON", groupId = "group_id")
public void consume(User message) {
logger.info("Consuming Message {}", message);
}
}
Probando el envío de Json a Kafka
Enviaremos a Kafka una instancia de User .
public class User {
private String name;
public User() {
}
public User(String name) {
this.name = name;
}
// set, get ..
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
'}';
}
}
@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {
private final KafkaJsonProducer jsonProducer;
public CommandLineAppStartupRunner(KafkaJsonProducer jsonProducer) {
this.jsonProducer = jsonProducer;
}
@Override
public void run(String... args) throws Exception {
jsonProducer.sendMessage(new User("Larry"));
jsonProducer.sendMessage(new User("The Edge"));
jsonProducer.sendMessage(new User("Charly"));
}
}
Observamos el resultado en la consola.
Enviando un ACK Acknowledgment al recibir el mensaje en el consumer de Kafka
Si deseamos podemos establecer el auto commit como falso para los un consumer. Esto hará que debamos enviar un ack indicando que recibimos ok el mensaje en nuestro listener.
Debemos deshabilitar el auto-commit por configuración o por property. Ambas opciones son igual de válidas.
Por application.properties
spring.kafka.consumer.enable-auto-commit=false
Por configuración.
Cómo enviamos el ack en nuestro listener de Kafka
Teniendo ya la configuración para que no haga auto-commit. Indicamos el ack en el listener de este modo.
Conclusión
En este Post configuramos Spring Boot con Kafka. Para los objetos indicamos un Json serializer a fin de enviar el mensaje en formato Json. Enviamos mensajes mediante un consumer a un tópico y lo recibimos en un consumer mediante un listener. También respondimos un ack en nuestro consumer para avisar que recibimos ok el mensaje.
Código fuente de este ejemplo
Como siempre te dejo el código fuente de este ejemplo para que lo tengas a mano.
https://github.com/gustavopeiretti/spring-boot-examples/tree/master/spring-boot-kafka