Let’s write a reactive Kafka producer/consumer

Let's write a reactive Kafka producer/consumer

Posted by Xavier Bouclet on July 09, 2022 · 30 mins read

Let’s write a reactive Kafka producer/consumer

1. Purpose of this blog post

Last time we wrote a simple Kafka producer/consumer using Spring Boot. This time, we are gonna implement ones using the Spring Boot reactive stack Webflux. It should take less than 1 hour.

2. Setup a Kafka environment

To setup a Kafka environment, you need create a docker-compose.yml file.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.1.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.1.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  schema-registry:
    image: confluentinc/cp-schema-registry:7.1.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/kafka-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.1.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.1.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.1.2
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.1.2
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

This docker-compose.yml can be found on Confluent Github

Next, we need to run the following command where the docker-compose.yml file is located.

docker compose up -d

And voilà, you should see something similar to :

[+] Running 8/8
 ⠿ Container zookeeper        Started                                         0.9s
 ⠿ Container broker           Started                                          1.6s
 ⠿ Container schema-registry  Started                                          2.4s
 ⠿ Container rest-proxy       Started                                          3.7s
 ⠿ Container connect          Started                                          3.7s
 ⠿ Container ksqldb-server    Started                                          4.7s
 ⠿ Container ksql-datagen     Started                                          5.7s

3. Write a producer

To create the project, you can use the Initializr.

Choose the following options :

  • Artifact : kafka-webflux-producer (it should change the name as well)

  • Language : Kotlin

  • Dependencies :

    • Spring For Apache kafka

    • Spring For Apache kafka Streams

    • Spring Reactive Web

Let’s dive in the code. The pom.xml should contain the following dependencies :

<dependencies>
....
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.kotlin</groupId>
        <artifactId>reactor-kotlin-extensions</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
....
<dependencies>

To allow us to use Kafka in a reactive way, we need to add this dependency :

<dependencies>
....
    <dependency>
        <groupId>io.projectreactor.kotlin</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.3.11</version>
    </dependency>
....
<dependencies>

Let’s see the main generated main class KafkaWebfluxProducerApplication.

@SpringBootApplication
class KafkaWebfluxProducerApplication

fun main(args: Array<String>) {
    runApplication<KafkaProducerApplication>(*args)
}

We don’t need to modify it.

To send a message, we are gonna use a bean provided by Spring ReactiveKafkaProducerTemplate.

@Service
class TopicProducer(
    @Value("\${topic.name.producer}") private val topicName: String,
    val kafkaTemplate: ReactiveKafkaProducerTemplate<String, String>
) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(TopicProducer::class.java)
    }

    fun send( message:String): Mono<SenderResult<Void>> {
        LOGGER.info("Payload sent: {} to {}", message, topicName)
        return kafkaTemplate.send(topicName, message) (1)
    }
}
1 Send the message to the topic corresponding to the property topic.name.producer.

Contrary to a standard KafkaTemplate, there is no autoconfiguration available, so we need to write a bit of configuration to make the ReactiveKafkaProducerTemplate available.

@Configuration
class ReactiveKafkaProducerConfig {
    @Bean
    fun reactiveKafkaProducerTemplate(properties: KafkaProperties) =
        ReactiveKafkaProducerTemplate<String, String>(SenderOptions.create(properties.buildProducerProperties()))

}

When I, usually, develop a Webflux application, I prefer to use the RouterFunctions, so we are gonna add a router and a handler to our app. Let’s write our MessageHandler to interact with our producer TopicProducer.

@Component
class MessageHandler(private val producer: TopicProducer) {

    fun send(request: ServerRequest): Mono<ServerResponse> {
        val message = when {
            request.queryParam("message").isPresent -> {
                request.queryParam("message").get()
            }
            else -> "default message"
        }

        return producer.send(message).map { ServerResponse.ok().build() }.flatMap{it}
    }
}

Now, we need to add the router ApplicationRouter to be able to interact with our application.

@Configuration
class ApplicationRouter(val messageHandler: MessageHandler) {

    @Bean
    fun route() = router {
        "/kafka".nest {
            "/send".nest {
                GET("", messageHandler::send)
            }
        }
    }
}

Finally, we need to add a few properties. The Spring Initializr generated an application.properties file. I usually change that for application.yml in order to have more readable properties file.

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

topic:
  name:
    producer: quickstart (1)

auto:
  create:
    topics:
      enable: true (2)
1 The producer send message received by the controller to the topic quickstart.
2 Should create the topic needed

If the topic isn’t created, you can use the following command.

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic quickstart

Finally, to run your application you can use the main class KafkaWebfluxProducerApplication or use the command.

$ ./mvnw spring-boot:run

The console should show something similar to :

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.1)

2022-07-09 15:24:43.713  INFO 38989 --- [           main] c.x.k.KafkaWebfluxProducerApplicationKt  : Starting KafkaWebfluxProducerApplicationKt using Java 17.0.3 on localhost with PID 38989 (/Users/xavierbouclet/Sources/kafka-demo/kafka-webflux-producer/target/classes started by xavierbouclet in /Users/xavierbouclet/Sources/kafka-demo)
2022-07-09 15:24:43.714  INFO 38989 --- [           main] c.x.k.KafkaWebfluxProducerApplicationKt  : No active profile set, falling back to 1 default profile: "default"
2022-07-09 15:24:44.233  INFO 38989 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2022-07-09 15:24:44.239  INFO 38989 --- [           main] c.x.k.KafkaWebfluxProducerApplicationKt  : Started KafkaWebfluxProducerApplicationKt in 0.641 seconds (JVM running for 0.827)

We can call our producer with curl or postman. I personally prefer to use curl.

curl localhost:8080/kafka/send\?message=test1

Now let’s write a reactive consumer to listen our topic and see the messages sent.

4. Write a consumer

To create the reactive kafka consumer, you can use the Initializr.

Choose the following options :

  • Artifact : kafka-consumer (it should change the name as well)

  • Language : Kotlin

  • Dependencies :

    • Spring For Apache kafka

    • Spring For Apache kafka Streams

    • Spring Reactive Web

Like the producer, the pom.xml should contain :

<dependencies>
....
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.kotlin</groupId>
        <artifactId>reactor-kotlin-extensions</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
....
<dependencies>

To allow us to consume Kafka in a reactive way, we need to add this dependency :

<dependencies>
....
    <dependency>
        <groupId>io.projectreactor.kotlin</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.3.11</version>
    </dependency>
....
<dependencies>

We don’t need to modify anything in the main class KafkaWebfluxConsumerApplication.

@SpringBootApplication
class KafkaWebfluxConsumerApplication

fun main(args: Array<String>) {
    runApplication<KafkaWebfluxConsumerApplication>(*args)
}

In our consumer, we need a TopicListener to listen the Kafka topic "quickstart".

@Service
class TopicListener(
    private val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, String>
) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(TopicListener::class.java)
    }

    fun consumeTopic(): Flux<String> {
        return reactiveKafkaConsumerTemplate
            .receiveAutoAck()
            .doOnNext {
                LOGGER.info(
                    "received key={}, value={} from topic={}, offset={}, partition={}, headers={}",
                    it.key(),
                    it.value(),
                    it.topic(),
                    it.offset(),
                    it.partition(),
                    it.headers()
                )
            }
            .map { it.value() }
            .doOnNext { LOGGER.info("successfully consumed {}={}", String::class.java.simpleName, it) }
            .doOnError { LOGGER.error("something bad happened while consuming : {}", it.message) }
    }

}

The TopicListener leverage ReactiveKafkaConsumerTemplate but in order to use it, we need to configure it.

@Configuration
class ReactiveKafkaConsumerConfig {

    @Bean
    fun kafkaReceiverOptions(@Value(value = "\${topic.name.consumer}") topic: String, kafkaProperties: KafkaProperties): ReceiverOptions<String?, String> {
        val basicReceiverOptions: ReceiverOptions<String, String> = ReceiverOptions.create(kafkaProperties.buildConsumerProperties())
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
    }

    @Bean
    fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, String>) =
        ReactiveKafkaConsumerTemplate(kafkaReceiverOptions)

}

Our application needs a component to start listening our kafka topic : quickstart. So we need to add the KafkaListener.

@Component
class KafkaListener(val topicListener: TopicListener) {

    @EventListener(ContextRefreshedEvent::class)
    fun contextRefreshedEvent() {
       topicListener.consumeTopic().subscribe()
    }
}

And now the application.yaml.

spring:
    kafka:
        consumer:
            auto-offset-reset: earliest
            bootstrap-servers: 127.0.0.1:9092
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            group-id: group_id (1)
server:
    port: 8081 (2)

topic:
    name:
        consumer: quickstart (3)
1 The consumer Kafka group-id.
2 The Spring Boot port needs to be changed to avoid the conflict with the producer.
3 Topic listened by our consumer.

Finally, to run your application you can use the main class KafkaWebfluxConsumerApplication or use the command.

$ ./mvnw spring-boot:run

The console should show something similar to :

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.1)

2022-07-09 15:37:48.429  INFO 40117 --- [           main] c.x.k.KafkaWebfluxConsumerApplicationKt  : Starting KafkaWebfluxConsumerApplicationKt using Java 17.0.3 on localhost with PID 40117 (/Users/xavierbouclet/Sources/kafka-demo/kafka-webflux-consumer/target/classes started by xavierbouclet in /Users/xavierbouclet/Sources/kafka-demo)
2022-07-09 15:37:48.430  INFO 40117 --- [           main] c.x.k.KafkaWebfluxConsumerApplicationKt  : No active profile set, falling back to 1 default profile: "default"
2022-07-09 15:37:49.618  INFO 40117 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8081
2022-07-09 15:37:49.642  INFO 40117 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [127.0.0.1:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-group_id-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = group_id
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2022-07-09 15:37:49.683  INFO 40117 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-07-09 15:37:49.683  INFO 40117 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-07-09 15:37:49.683  INFO 40117 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1657395469682
2022-07-09 15:37:49.701  INFO 40117 --- [afka-group_id-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): quickstart
2022-07-09 15:37:49.715  INFO 40117 --- [           main] c.x.k.KafkaWebfluxConsumerApplicationKt  : Started KafkaWebfluxConsumerApplicationKt in 1.507 seconds (JVM running for 1.836)
2022-07-09 15:37:49.860  INFO 40117 --- [afka-group_id-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Resetting the last seen epoch of partition quickstart-0 to 0 since the associated topicId changed from null to q5xnD2sbSAewfdlj6w4QCQ
2022-07-09 15:37:49.861  INFO 40117 --- [afka-group_id-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: _JHkNBaBRNG5evp-o1-x_A
2022-07-09 15:37:49.861  INFO 40117 --- [afka-group_id-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2022-07-09 15:37:49.862  INFO 40117 --- [afka-group_id-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-07-09 15:37:49.868  INFO 40117 --- [afka-group_id-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Request joining group due to: need to re-join with the given member-id
2022-07-09 15:37:49.868  INFO 40117 --- [afka-group_id-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group

5. Play with the producer and the consumer

Our producer and our consumer are not related, and so we can start our producer KafkaWebfluxProducerApplication and stop the consumer.

Let’s send a few curl commands (or postman).

curl localhost:8080/kafka/send\?message=test1

curl localhost:8080/kafka/send\?message=test2

curl localhost:8080/kafka/send\?message=test3

curl localhost:8080/kafka/send\?message=test4

curl localhost:8080/kafka/send\?message=test5

curl localhost:8080/kafka/send\?message=test6

Now let’s start our consumer KafkaWebfluxConsumerApplication.

You should see something similar to the following output :

2022-07-09 15:41:04.779  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test1 from topic=quickstart, offset=68, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test1
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test2 from topic=quickstart, offset=69, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test2
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test3 from topic=quickstart, offset=70, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test3
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test4 from topic=quickstart, offset=71, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test4
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test5 from topic=quickstart, offset=72, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test5
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=test6 from topic=quickstart, offset=73, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-09 15:41:04.780  INFO 40415 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed String=test6

6. Conclusion

We now have seen how to implement a reactive producer/consumer for our Kafka. You can find the code on my Github repository kafka-demo.