To create the project, you can use the Initializr.
Choose the following options :
Let’s dive in the code.
The pom.xml should contain the following dependencies :
<dependencies>
....
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
....
<dependencies>
Let’s see the main generated main class KafkaProducerApplication and add some configuration.
@EnableKafka (1)
@SpringBootApplication
class KafkaProducerApplication
fun main(args: Array<String>) {
runApplication<KafkaProducerApplication>(*args)
}
1 |
Allow Spring Boot to trigger the autoconfiguration to communicate with Kafka. |
To send a message, we are gonna use a bean provided by Spring KafkaTemplate.
@Service
class TopicProducer(@Value("\${topic.name.producer}")
private val topicName: String,
val kafkaTemplate: KafkaTemplate<String, String>) {
companion object {
private val LOGGER = LoggerFactory.getLogger(TopicProducer::class.java)
}
fun send( message:String) {
LOGGER.info("Payload sent: {}", message)
kafkaTemplate.send(topicName, message) (1)
}
}
1 |
Send the message to the topic corresponding to the property topic.name.producer. |
Let’s write a controller KafkaController to interact with our producer TopicProducer.
@RestController
@RequestMapping(value = ["kafka"])
class KafkaController(val topicProducer: TopicProducer) {
@GetMapping(value = ["send"])
fun send(@RequestParam(name = "message") message: String?) {
topicProducer.send(message ?: "Test message sent to topic")
}
}
Finally, we need to add a few properties.
The Spring Initializr generated an application.properties file.
I usually change that for application.yml in order to have more readable properties file.
spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
name:
producer: quickstart (1)
auto:
create:
topics:
enable: true (2)
1 |
The producer send message received by the controller to the topic quickstart. |
2 |
Should create the topic needed |
If the topic isn’t created, you can use the following command.
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic quickstart
Finally, to run your application you can use the main class KafkaProducerApplication or use the command.
The console should show something similar to :
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.7.1)
2022-07-07 21:06:04.968 INFO 13187 --- [ main] c.x.k.KafkaProducerApplicationKt : Starting KafkaProducerApplicationKt using Java 17.0.3 on host with PID 13187 (/Users/xavierbouclet/Sources/kafka-demo/kafka-producer/target/classes started by xavierbouclet in /Users/xavierbouclet/Sources/kafka-demo)
2022-07-07 21:06:04.969 INFO 13187 --- [ main] c.x.k.KafkaProducerApplicationKt : No active profile set, falling back to 1 default profile: "default"
2022-07-07 21:06:05.503 INFO 13187 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2022-07-07 21:06:05.508 INFO 13187 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-07-07 21:06:05.508 INFO 13187 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.64]
2022-07-07 21:06:05.551 INFO 13187 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-07-07 21:06:05.551 INFO 13187 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 542 ms
2022-07-07 21:06:05.766 INFO 13187 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-07-07 21:06:05.773 INFO 13187 --- [ main] c.x.k.KafkaProducerApplicationKt : Started KafkaProducerApplicationKt in 0.991 seconds (JVM running for 1.311)
We can now interact with our producer with curl or postman.
I personally prefer to use curl.
curl localhost:8080/kafka/send
Now let’s write a consumer to listen our topic and see the messages sent.