Json Serialization with Kafka

Json Serialization with Kafka

Posted by Xavier Bouclet on July 17, 2022 · 21 mins read

Json Serialization with Kafka

1. Purpose of this blog post

Last time we wrote a reactive Kafka producer/consumer using Spring Boot with a simple String serializer. This time, we are gonna use a Json serializer using the Spring Boot reactive stack Webflux. If you have never implemented a producer/consumer, I advise you to read my 2 last posts :

2. Setup a Kafka environment

To setup a Kafka environment, you need create a docker-compose.yml file.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.1.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.1.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  schema-registry:
    image: confluentinc/cp-schema-registry:7.1.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/kafka-connect-datagen:0.5.3-7.1.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.1.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.1.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.1.2
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.1.2
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

This docker-compose.yml can be found on Confluent Github

Next, we need to run the following command where the docker-compose.yml file is located.

docker compose up -d

And voilà, you should see something similar to :

[+] Running 8/8
 ⠿ Container zookeeper        Started                                         0.9s
 ⠿ Container broker           Started                                          1.6s
 ⠿ Container schema-registry  Started                                          2.4s
 ⠿ Container rest-proxy       Started                                          3.7s
 ⠿ Container connect          Started                                          3.7s
 ⠿ Container ksqldb-server    Started                                          4.7s
 ⠿ Container ksql-datagen     Started                                          5.7s

3 Kafka Serializers/Deserializers

When we use a JVM based language with Kafka, we have a few serializers/deserializers available in the org.apache.kafka:kafka-clients library. They are located in the package org.apache.kafka.common.serialization.

This is a list of the types available to serialization/deserialization within the Kafka client.

  • ByteArray

  • ByteBuffer

  • Bytes

  • Double

  • Float

  • Integer

  • List

  • Long

  • Short

  • String

  • UUID

  • Void

Just add Serializer or Deserializer after the type.

Example for the String type :

  • org.apache.kafka.common.serialization.StringSerializer

  • org.apache.kafka.common.serialization.StringDeserializer

Fortunately for us, SpringBoot already has a solution to use Json to use as a serializer/deserializer in a Kafka message :

  • org.springframework.kafka.support.serializer.JsonSerializer

  • org.springframework.kafka.support.serializer.JsonDeserializer

The important thing is that they implement the Kafka interfaces for serialization/deserialization :

  • org.apache.kafka.common.serialization.Serializer

  • org.apache.kafka.common.serialization.Deserializer

If you want to implement your own serializer/deserializer, you can do the same. It will be the subject of a future blog post.

4. Serialization in the producer

In our last post we used the following properties to configure the producer :

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

To use the JsonSerializer, we need to change the value-serializer to the following :

      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

To serialize an object, we need to create a type more complex than a String. For example, we can write a class Message.

data class Message(val id: UUID?=null,
                   val message: String="")

Our ReactiveKafkaProducerTemplate uses String sor we need to add one which uses Message.ReactiveKafkaProducerConfig to take a message.

@Configuration
class ReactiveKafkaProducerConfig {

...
    @Bean
    fun reactiveKafkaProducerJsonTemplate(properties: KafkaProperties) =
        ReactiveKafkaProducerTemplate<String, Message>(SenderOptions.create(properties.buildProducerProperties())) (1)
}
1 ReactiveKafkaProducerTemplate<String, Message> take a Message as value.

We can modify our TopicProducer to add our reactiveKafkaProducerJsonTemplate.

@Service
class TopicProducer(
    @Value("\${topic.name.producer}") private val topicName: String,
    val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, String>,
    val reactiveKafkaProducerJsonTemplate: ReactiveKafkaProducerTemplate<String, Message>
) {

...

    fun send(message: Message): Mono<SenderResult<Void>> {
        LOGGER.info("Payload sent: {} to {}", message, topicName)
        return reactiveKafkaProducerJsonTemplate.send(topicName, message) (1)
    }
}
1 We can pass the message directly as a parameter of the function.

To interact with the Topicproducer, we can reuse our MessageHandler and pass directly the Json received by our ApplicationRouter.

@Component
class MessageHandler(private val producer: TopicProducer) {
...
    fun sendJson(request: ServerRequest): Mono<ServerResponse> {
        return request.bodyToMono(Message::class.java)
            .map { producer.send(it) }
            .flatMap { it }
            .map {
                ServerResponse
                    .status(HttpStatus.CREATED)
                    .contentType(MediaType.APPLICATION_JSON)
                    .build()
            }.flatMap { it }
    }
}

We need to add the route to send the Json : /kafka/json.

@Configuration
class ApplicationRouter(val messageHandler: MessageHandler) {

    @Bean
    fun route() = router {
        "/kafka".nest {
            "/send".nest {
                GET("", messageHandler::send)
            }
            "/json".nest {
                POST("", messageHandler::sendJson)
            }
        }
    }
}

You can run our producer using the command :

$ ./mvnw spring-boot:run

And then use curl to send messages to our producer.

curl -X POST -H "Content-Type: application/json" -d '{"id" : "ca0a7833-f0b8-4a54-a824-04e5f1dcae7e", "message":"GG"}' localhost:8080/kafka/json

You should see the following in the producer logs.

2022-07-17 21:13:20.703 DEBUG 87709 --- [ctor-http-nio-4] o.s.w.r.f.s.s.RouterFunctionMapping      : [4b0e2a93-3] Mapped to org.springframework.web.reactive.function.server.RouterFunctionDsl$POST$2@743e66f7
2022-07-17 21:13:20.703 DEBUG 87709 --- [ctor-http-nio-4] reactor.netty.channel.FluxReceive        : [4b0e2a93-1, L:/[0:0:0:0:0:0:0:1]:8080 - R:/[0:0:0:0:0:0:0:1]:62666] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2022-07-17 21:13:20.705 DEBUG 87709 --- [ctor-http-nio-4] o.s.http.codec.json.Jackson2JsonDecoder  : [4b0e2a93-3] Decoded [Message(id=ca0a7833-f0b8-4a54-a824-04e5f1dcae7e, message=GG)]
2022-07-17 21:13:20.705  INFO 87709 --- [ctor-http-nio-4] c.xavierbouclet.kafkademo.TopicProducer  : Payload sent: Message(id=ca0a7833-f0b8-4a54-a824-04e5f1dcae7e, message=GG) to quickstart
2022-07-17 21:13:20.708 DEBUG 87709 --- [ctor-http-nio-4] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4b0e2a93-3] Completed 201 CREATED

Now that we have achieved the serialization in the producer, we can deal with the deserializtion in the consumer.

5. Deserialization in the consumer

We can reuse the consumer developed in the last blog post as a starting point.

spring:
    kafka:
        consumer:
            auto-offset-reset: earliest
            bootstrap-servers: 127.0.0.1:9092
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            group-id: group_id

To use the JsonDeserializer, we need to change the value-deserializer to the following :

      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Contrary to the producer, that’s not the only thing we need to do.

spring:
    kafka:
        consumer:
            auto-offset-reset: earliest
            bootstrap-servers: 127.0.0.1:9092
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            group-id: group_id
            properties:
                spring:
                    json:
                        use:
                            type:
                                headers: false
                        value:
                            default:
                                type: com.xavierbouclet.kafkaconsumer.Message (1)
    properties:
        spring:
            json:
                trusted:
                    packages: '*' (2)
1 The corresponding class Message.
2 To indicate that we trust allow our class Message to be deserialized.

We need to write the class com.xavierbouclet.kafkaconsumer.Message.

package com.xavierbouclet.kafkaconsumer (1)

import java.util.*

data class Message(val id: UUID?=null, val message: String="")
1 The package need to be the one defined in the application.yml.

To read a message with an objaect Message, we need to add a corresponding ReactiveKafkaConsumerTemplate

@Configuration
class ReactiveKafkaConsumerConfig {

    @Bean
    fun kafkaReceiverOptions(@Value(value = "\${topic.name.consumer}") topic: String, kafkaProperties: KafkaProperties): ReceiverOptions<String?, Message> {
        val basicReceiverOptions: ReceiverOptions<String, Message> = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()) (1)
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
    }

    @Bean
    fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, Message>) =     (1)
        ReactiveKafkaConsumerTemplate(kafkaReceiverOptions)

}
1 In our case, only the ReceiverOptions<String, Message> has changed.

The TopicListener takes the *ReactiveKafkaConsumerTemplate<String, Message> as a parameter.

@Service
class TopicListener(
    private val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, Message>
) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(TopicListener::class.java)
    }

    fun consumeTopic(): Flux<Message> {
        return reactiveKafkaConsumerTemplate
            .receiveAutoAck()
            .doOnNext {
                LOGGER.info(
                    "received key={}, value={} from topic={}, offset={}, partition={}, headers={}",
                    it.key(),
                    it.value(),
                    it.topic(),
                    it.offset(),
                    it.partition(),
                    it.headers()
                )
            }
            .map { it.value() }
            .doOnNext { LOGGER.info("successfully consumed {}={}", Message::class.java.simpleName, it) }
            .doOnError { LOGGER.error("something bad happened while consuming : {}", it.message) }
    }

}

You can run the producer and the consumer using the command :

$ ./mvnw spring-boot:run

And send data to the producer using a curl command.

$ curl -X POST -H "Content-Type: application/json" -d '{"id" : "58e5c738-6829-4609-8c6f-bebd1cfd2f97", "message":"GG"}' localhost:8080/kafka/json

You should see something similar to the following :

2022-07-17 22:43:14.424  INFO 91446 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : received key=null, value=Message(id=58e5c738-6829-4609-8c6f-bebd1cfd2f97, message=GG) from topic=quickstart, offset=17, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false)
2022-07-17 22:43:14.424  INFO 91446 --- [afka-group_id-1] c.x.kafkaconsumer.TopicListener          : successfully consumed Message=Message(id=58e5c738-6829-4609-8c6f-bebd1cfd2f97, message=GG)

6. Conclusion

We now have seen how to implement a json de/serialization within for our Kafka consumer/producer using Spring Boot. You can find the code on my Github repository kafka-demo.