Let’s write a simple Kafka producer/consumer

Let's write a simple Kafka producer/consumer

Posted by Xavier Bouclet on July 07, 2022 · 26 mins read

Let’s write a simple Kafka producer/consumer

1. Purpose of this blog post

We are gonna implement a producer and a consumer for Kafka. It should take less than 1 hour.

2. Setup a Kafka environment

To setup a Kafka environment, you need create a docker-compose.yml file.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.1.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.1.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  schema-registry:
    image: confluentinc/cp-schema-registry:7.1.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/kafka-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.1.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.1.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.1.2
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.1.2
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

This docker-compose.yml can be found on Confluent Github

Next, we need to run the following command where the docker-compose.yml file is located.

docker compose up -d

And voilà, you should see something similar to :

[+] Running 8/8
 ⠿ Container zookeeper        Started                                         0.9s
 ⠿ Container broker           Started                                          1.6s
 ⠿ Container schema-registry  Started                                          2.4s
 ⠿ Container rest-proxy       Started                                          3.7s
 ⠿ Container connect          Started                                          3.7s
 ⠿ Container ksqldb-server    Started                                          4.7s
 ⠿ Container ksql-datagen     Started                                          5.7s

3. Write a producer

To create the project, you can use the Initializr.

Choose the following options :

  • Artifact : kafka-producer (it should change the name as well)

  • Language : Kotlin

  • Dependencies :

    • Spring For Apache kafka

    • Spring For Apache kafka Streams

    • Spring Web

Let’s dive in the code. The pom.xml should contain the following dependencies :

<dependencies>
....
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
....
<dependencies>

Let’s see the main generated main class KafkaProducerApplication and add some configuration.

@EnableKafka (1)
@SpringBootApplication
class KafkaProducerApplication

fun main(args: Array<String>) {
    runApplication<KafkaProducerApplication>(*args)
}
1 Allow Spring Boot to trigger the autoconfiguration to communicate with Kafka.

To send a message, we are gonna use a bean provided by Spring KafkaTemplate.

@Service
class TopicProducer(@Value("\${topic.name.producer}")
    private val topicName: String,
    val kafkaTemplate: KafkaTemplate<String, String>) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(TopicProducer::class.java)
    }

    fun send( message:String) {
        LOGGER.info("Payload sent: {}", message)
        kafkaTemplate.send(topicName, message) (1)
    }
}
1 Send the message to the topic corresponding to the property topic.name.producer.

Let’s write a controller KafkaController to interact with our producer TopicProducer.

@RestController
@RequestMapping(value = ["kafka"])
class KafkaController(val topicProducer: TopicProducer) {
    @GetMapping(value = ["send"])
    fun send(@RequestParam(name = "message") message: String?) {
        topicProducer.send(message ?: "Test message sent to topic")
    }
}

Finally, we need to add a few properties. The Spring Initializr generated an application.properties file. I usually change that for application.yml in order to have more readable properties file.

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

topic:
  name:
    producer: quickstart (1)

auto:
  create:
    topics:
      enable: true (2)
1 The producer send message received by the controller to the topic quickstart.
2 Should create the topic needed

If the topic isn’t created, you can use the following command.

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic quickstart

Finally, to run your application you can use the main class KafkaProducerApplication or use the command.

$ ./mvnw spring-boot:run

The console should show something similar to :

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.1)

2022-07-07 21:06:04.968  INFO 13187 --- [           main] c.x.k.KafkaProducerApplicationKt         : Starting KafkaProducerApplicationKt using Java 17.0.3 on host with PID 13187 (/Users/xavierbouclet/Sources/kafka-demo/kafka-producer/target/classes started by xavierbouclet in /Users/xavierbouclet/Sources/kafka-demo)
2022-07-07 21:06:04.969  INFO 13187 --- [           main] c.x.k.KafkaProducerApplicationKt         : No active profile set, falling back to 1 default profile: "default"
2022-07-07 21:06:05.503  INFO 13187 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2022-07-07 21:06:05.508  INFO 13187 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-07-07 21:06:05.508  INFO 13187 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.64]
2022-07-07 21:06:05.551  INFO 13187 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-07-07 21:06:05.551  INFO 13187 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 542 ms
2022-07-07 21:06:05.766  INFO 13187 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-07-07 21:06:05.773  INFO 13187 --- [           main] c.x.k.KafkaProducerApplicationKt         : Started KafkaProducerApplicationKt in 0.991 seconds (JVM running for 1.311)

We can now interact with our producer with curl or postman. I personally prefer to use curl.

curl localhost:8080/kafka/send

Now let’s write a consumer to listen our topic and see the messages sent.

4. Write a consumer

To create the consumer, you can use the Initializr.

Choose the following options :

  • Artifact : kafka-consumer (it should change the name as well)

  • Language : Kotlin

  • Dependencies :

    • Spring For Apache kafka

    • Spring For Apache kafka Streams

    • Spring Web

Like the producer, the pom.xml should contain :

<dependencies>
....
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
....
<dependencies>

As for the producer, we need to add the @EnableKafka annotation to tell Spring Boot to create all the configuration needed to interact with Kafka.

@EnableKafka
@SpringBootApplication
class KafkaConsumerApplication

fun main(args: Array<String>) {
    runApplication<KafkaConsumerApplication>(*args)
}

In our consumer, we need a TopicListener to listen the Kafka topic "quickstart".

@Service
class TopicListener(@Value("\${topic.name.consumer}")
    private val topicName: String) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(TopicListener::class.java)
    }

    @KafkaListener(topics = ["\${topic.name.consumer}"], groupId = "group_id")
    fun consume(payload: ConsumerRecord<String?, String?>) {
        LOGGER.info("Topic: {}", topicName)
        LOGGER.info("key: {}", payload.key())
        LOGGER.info("Headers: {}", payload.headers())
        LOGGER.info("Partion: {}", payload.partition())
        LOGGER.info("Order: {}", payload.value())
    }
}

And now the application.yaml.

spring:
    kafka:
        consumer:
            auto-offset-reset: earliest
            bootstrap-servers: 127.0.0.1:9092
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
    port: 8081 (1)

topic:
    name:
        consumer: quickstart (2)
1 The Spring Boot port need to be changed to avoid the conflict with the producer.
2 Topic listened by our consumer.

Finally, to run your application you can use the main class KafkaConsumerApplication or use the command.

$ ./mvnw spring-boot:run

The console should show something similar to :

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.1)

2022-07-07 21:17:23.516  INFO 14157 --- [           main] c.x.k.KafkaConsumerApplicationKt         : Starting KafkaConsumerApplicationKt using Java 17.0.3 on ip-10-0-0-192.ca-central-1.compute.internal with PID 14157 (/Users/xavierbouclet/Sources/kafka-demo/kafka-consumer/target/classes started by xavierbouclet in /Users/xavierbouclet/Sources/kafka-demo)
2022-07-07 21:17:23.517  INFO 14157 --- [           main] c.x.k.KafkaConsumerApplicationKt         : No active profile set, falling back to 1 default profile: "default"
2022-07-07 21:17:23.917  INFO 14157 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2022-07-07 21:17:23.923  INFO 14157 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-07-07 21:17:23.923  INFO 14157 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.64]
2022-07-07 21:17:23.964  INFO 14157 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-07-07 21:17:23.964  INFO 14157 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 425 ms
2022-07-07 21:17:24.169  INFO 14157 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [127.0.0.1:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-group_id-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = group_id
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2022-07-07 21:17:24.206  INFO 14157 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-07-07 21:17:24.206  INFO 14157 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-07-07 21:17:24.206  INFO 14157 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1657243044205
2022-07-07 21:17:24.207  INFO 14157 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): quickstart
2022-07-07 21:17:24.224  INFO 14157 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2022-07-07 21:17:24.230  INFO 14157 --- [           main] c.x.k.KafkaConsumerApplicationKt         : Started KafkaConsumerApplicationKt in 0.875 seconds (JVM running for 1.199)

5. Play with the producer and the consumer

Our producer and our consumer are not related, and so we can start our producer KafkaProducerApplication and stop the consumer.

Let’s send a few curl commands (or postman).

curl localhost:8080/kafka/send\?message=test1

curl localhost:8080/kafka/send\?message=test2

curl localhost:8080/kafka/send\?message=test3

curl localhost:8080/kafka/send\?message=test4

curl localhost:8080/kafka/send\?message=test5

curl localhost:8080/kafka/send\?message=test6

Now let’s start our consumer KafkaConsumerApplication.

You should see something similar to the following output :

2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test1
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Topic: quickstart
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test2
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Topic: quickstart
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test3
2022-07-07 21:24:25.876  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Topic: quickstart
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test4
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Topic: quickstart
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test5
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Topic: quickstart
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : key: null
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Headers: RecordHeaders(headers = [], isReadOnly = false)
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Partion: 0
2022-07-07 21:24:25.877  INFO 14738 --- [ntainer#0-0-C-1] c.x.kafkaconsumer.TopicListener          : Order: test6

6. Conclusion

We now have seen how to implement a simple producer/consumer for our Kafka. You can find the code on my Github repository kafka-demo.