Configurar Kafka en SpringBoot

Veamos en este post como implementar y configurar Apache Kafka con SpringBoot.

Spring provee una forma simple de integrarse con Kafka para enviar mensajes. Mediante anotaciones podemos recibir los mensajes de Kafka y a través del uso de un template enviamos fácilmente los mensajes.

Kafka nos da la infraestructura para enviar mensajes utilizando producers y consumers que los receptan. Los mensajes que se producen y se consumen se envían a través de topics. Un tópico es la categoría del mensaje que se está enviando.

Kafka

Dependencias para Kafka en Spring Boot

Necesitaremos las siguientes dependencias para nuestro proyecto

'org.springframework.boot:spring-boot-starter-web'
'org.springframework.kafka:spring-kafka'
'org.springframework.boot:spring-boot-starter-test'
'org.springframework.kafka:spring-kafka-test'

Proyecto de Spring Boot con Kafka

Crearemos nuestro proyecto de Spring Boot igual que cualquier otro.

@SpringBootApplication
public class DemoKafkaApplication {

   public static void main(String[] args) {
      SpringApplication.run(DemoKafkaApplication.class, args);
   }

}

Definiendo nuestro KafkaTemplate para enviar mensajes a un tópico de Kafka

Para enviar mensajes debemos definir KafkaTemplate.
Lo primero que debemos saber es qué tipo de mensajes queremos enviar.

Habitualmente el key es un String y el tipo de dato varía entre String u Objetos. Aquí, definimos mensajes de tipo String para el valor.

KafkaTemplate<String, String> kafkaTemplate;
@Configuration
public class KafkaStringConfig {

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean(name = "kafkaStringTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Definiendo un producer para enviar mensajes a un tópico de Kafka

Definiremos un productor que se encargara de producir un mensaje hacia un tópico de Kafka.

Como vimos antes en la configuración del productor, definimos un KafkaTemplate que podemos usar en nuestros servicios o componentes.

Solo tenemos que enviar el mensaje usando kafkaTemplate.send(..)

kafkaTemplate.send("topic", "message")

Mira que en este componente estamos usando el template que creamos en la config .

@Component
public class KafkaStringProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStringProducer.class);

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaStringProducer(@Qualifier("kafkaStringTemplate") KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        LOGGER.info("Producing message {}", message);
        this.kafkaTemplate.send("TOPIC-DEMO", message);
    }

}

Definiendo un consumer para recibir los mensajes de un tópico de Kafka

En este ejemplo crearemos también un consumer para recibir el mensaje de Kafka. Esto habitualmente estaría en otro microservicio, pero lo haremos aquí mismo a fin de simplificar el ejemplo.

En el consumer anotamos el método que queremos “escuche” el mensaje. Indicando el tópico que debe escuchar.

@KafkaListener(topics = "TOPIC-DEMO" , groupId = "group_id")
@Component
public class KafkaStringConsumer {

    Logger logger = LoggerFactory.getLogger(KafkaStringConsumer.class);

    @KafkaListener(topics = "TOPIC-DEMO" , groupId = "group_id")
    public void consume(String message) {
        logger.info("Consuming Message {}", message);
    }

}

Crearemos un CommandLineRunner para realizar nuestra prueba

A efectos de visualizar el funcionamiento de nuestra aplicación crearemos un CommandLineRunner que enviará mensajes cada dos segundos en un loop.

@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {

    private final KafkaStringProducer stringProducer;

    public CommandLineAppStartupRunner(KafkaStringProducer stringProducer) {
        this.stringProducer = stringProducer;
    }

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < 10; i++) {
            stringProducer.sendMessage("Hello kafka !! " + i);
            Thread.sleep(2000);
        }
    }
}

Creamos un docker compose con Kafka

Para nuestro proyecto necesitamos Kafka funcionando. Asi que usaremos un contenedor docker con Kafka.

Usaremos una imagen de Kafka desde esta ruta de Docker Hub

version: '3'

services:
  zookeeper:
    container_name: 
      zookeeper_merge
    image: 
      wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka_merge_net
  kafka:
    container_name: 
      kafka_merge
    image:
      wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ZK_SERVER: zookeeper:2181
      ADVERTISED_HOST: localhost
      CONSUMER_THREADS: 5
      BROKER_ID: 1
      ADVERTISED_PORT: 9092

    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka_merge_net
 
networks:
  kafka_merge_net:
    driver: bridge

Iniciamos Kafka desde la imagen de docker

Debemos guardar el file docker-compose.yml y luego ejecutar en la misma ubicación

docker-compose up

docker compose up

Una vez iniciado nuestro docker con Kafka podemos realizar la prueba de nuestro servicio. Vemos la ejecución del loop que definimos previamente cada dos segundos.

Arrancamos nuestro proyecto en Spring Boot

Para este ejemplo uso gradle, por lo que lo inicio gradlew bootRun

Vemos en la consola la salida de nuestro proyecto

spring boot

Definiendo KafkaTemplate para enviar mensajes JSON

Dijimos antes que cuando definimos KafkaTemplate, indicamos el tipo de mensaje el el value .

 KafkaTemplate<K, V>

Por lo que podemos también definir como tipo de mensaje una clase y que ese objeto viaje como un Json. En este ejemplo, queremos enviar un objeto de tipo User serializado en Json.

Kafka Serializer

Aquí, en la config definimos JsonSerializer como Serializer del valor.

@Configuration
public class KafkaJsonConfig {

    public ProducerFactory<String, User> producerFactoryJson() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean(name = "kafkaJsonTemplate")
    public KafkaTemplate<String, User> kafkaTemplateJson() {
        return new KafkaTemplate<>(producerFactoryJson());
    }

}

Definiendo un producer y el consumer para mensajes Json

Observa que aquí usamos KafkaTemplate que definimos para que reciba una clase.

KafkaTemplate<String, User>
@Component
public class KafkaJsonProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJsonProducer.class);

    private final KafkaTemplate<String, User> kafkaTemplate;
    
    public KafkaJsonProducer(@Qualifier("kafkaJsonTemplate") KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User user) {
        LOGGER.info("Producing message {}", user);
        this.kafkaTemplate.send("TOPIC-DEMO-JSON", user);
    }

}

El consumer es igual que el primero que definimos. Solo que enviamos una clase User y a un tópico diferente.

@Component
public class KafkaJsonConsumer {

    Logger logger = LoggerFactory.getLogger(KafkaJsonConsumer.class);

    @KafkaListener(topics = "TOPIC-DEMO-JSON", groupId = "group_id")
    public void consume(User message) {
        logger.info("Consuming Message {}", message);
    }

}

Probando el envío de Json a Kafka

Enviaremos a Kafka una instancia de User .

public class User {

    private String name;

    public User() {
    }

    public User(String name) {
        this.name = name;
    }

    // set, get .. 

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                '}';
    }
}
@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {

    private final KafkaJsonProducer jsonProducer;

    public CommandLineAppStartupRunner(KafkaJsonProducer jsonProducer) {
        this.jsonProducer = jsonProducer;
    }

    @Override
    public void run(String... args) throws Exception {
        jsonProducer.sendMessage(new User("Larry"));
        jsonProducer.sendMessage(new User("The Edge"));
        jsonProducer.sendMessage(new User("Charly"));
    }
}

Observamos el resultado en la consola.

Kafka Json Console

Enviando un ACK Acknowledgment al recibir el mensaje en el consumer de Kafka

Si deseamos podemos establecer el auto commit como falso para los un consumer. Esto hará que debamos enviar un ack indicando que recibimos ok el mensaje en nuestro listener.

Debemos deshabilitar el auto-commit por configuración o por property. Ambas opciones son igual de válidas.

Por application.properties

spring.kafka.consumer.enable-auto-commit=false

Por configuración.

Kafka autocommit false

Cómo enviamos el ack en nuestro listener de Kafka

Teniendo ya la configuración para que no haga auto-commit. Indicamos el ack en el listener de este modo.

Kafka ack

Conclusión

En este Post configuramos Spring Boot con Kafka. Para los objetos indicamos un Json serializer a fin de enviar el mensaje en formato Json. Enviamos mensajes mediante un consumer a un tópico y lo recibimos en un consumer mediante un listener. También respondimos un ack en nuestro consumer para avisar que recibimos ok el mensaje.

Código fuente de este ejemplo

Como siempre te dejo el código fuente de este ejemplo para que lo tengas a mano.

https://github.com/gustavopeiretti/spring-boot-examples/tree/master/spring-boot-kafka

Hi! If you find my posts helpful, please support me by inviting me for a coffee :)

Ver también