DEV Community

Marcos Maia
Marcos Maia

Posted on • Edited on

Simplest Spring Kafka Producer and Consumer - Kotlin version

Let's now build and run the simplest example of a Kotlin Kafka Consumer and Producer using spring-kafka. If you need assistance with Kafka, spring boot or docker which are used in this article, or want to check out the sample application from this post please check the References section below, but for a quick access to the source code you can just: git clone git@github.com:stockgeeks/spirng-kafka-kotlin.git

These are the steps we're going to follow in this post:

  1. Create a spring-boot Kotlin application, java 11 build with Gradle or Maven.
  2. Create a Spring Kafka Kotlin Consumer.
  3. Run local Kafka and Zookeeper using docker and docker-compose.
  4. Produce some messages from the command line console-producer and check the consumer log.
  5. Create a Spring Kafka Kotlin Producer.
  6. Add some custom configuration.
  7. Build an endpoint that we can pass in a message to be produced to Kafka.
  8. Build and run the application with Maven or Gradle.
  9. Send a message with curl and check the console messages to follow the message from the endpoint -> producer -> kafka -> consumer.

Create a Spring Boot Kotlin Application

Navigate to spring initializer, check this other post as an example if you want some more details, when creating the project pick Gradle OR Maven as build system and Kotlin, if you have never used spring initializer check this other post which covers it using Java(https://dev.to/thegroo/spring-boot-crash-course-21nm), make sure to have spring-kafka dependency added and select Java 11 under Options.

Create a Kotlin Spring Kafka Consumer

Let's now write the simplest possible Kotlin Kafka consumer with spring-kafka using spring-boot default configurations.

Create a class called KotlinConsumer:

import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class KotlinConsumer {
  private val logger = LoggerFactory.getLogger(javaClass)
  @KafkaListener(topics = ["simple-message-topic"], groupId = "simple-kotlin-consumer")
  fun processMessage(message: String) {
    logger.info("got message: {}", message)
  }
}

Enter fullscreen mode Exit fullscreen mode

This is it, this is all it takes because we are relying on spring-boot default configurations the key and value are configured to use be String by default.

Start Kafka and Zookeeper

As we've seen in details in this other article, we're going to be using docker-compose to run our local Kafka for development, let's start our Kafka and Zookeeper containers, if you didn't checkout the source code you can create a docker-compose.yml file in the root of your project and add the content from the link, let's start Kafka and Zookeeper:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Make sure the containers are running:

docker ps
Enter fullscreen mode Exit fullscreen mode

You should see Kafka and Zookeeper running:

console-kafka-zookeeper-running-docker

Run the app

Let's now compile and run the application, if you need more detailed instructions please check this post, run the following commands to build and run the application:

using gradle(w):

gradle build && gradle bootRun
Enter fullscreen mode Exit fullscreen mode

OR if you're using mvn(w):

mvn clean package && mvn spring-boot:run

Enter fullscreen mode Exit fullscreen mode

The optional command with w in the end(mvnw or gradlew) applies if you don't have maven or gradle installed and prefers using the wrappers which are created by default by the start.spring.io site.

The application will start and you will see on the standard output the configurations for the consumer, the Kafka version being used and a message Started SpringKafkaApplication in x seconds.

app-started

Make sure to keep the application running, don't close the terminal window where it's running. Let's now produce a few messages with the Kafka console producer and see our consumer processing the messages and logging them out.

Produce message using the Kafka console producer

Open a new terminal and enter the Kafka running container so we can use the console producer:

docker exec -it kafka /bin/bash

Enter fullscreen mode Exit fullscreen mode

Once inside the container cd /opt/kafka/bin , the command line scripts for Kafka in this specific image we're using are located in this folder. If you're using different docker images those scripts might be in some other location.

Run the console producer which will enable you to send messages to Kafka:

./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message-topic
Enter fullscreen mode Exit fullscreen mode

kafka-console-producer

The console will now block and you can write your message and hit enter, for each time you do this one message will be produced to the simple-topic. Try sending a few messages and watch the application standard output in the shell where you are running your Spring Boot application processing the messages and printing them.

console-producer-spring-kafka-consumer

Write a simple Spring Kotlin Producer

Time to create our spring-kafka producer. Create a class called KotlinProducer, we will again use the defaults for the producer as we did for the consumer.

import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class KotlinProducer(private val kafkaTemplate: KafkaTemplate<String, String>) {

  fun send(message: String) {
    kafkaTemplate.send("simple-message-topic", message)
  }

}
Enter fullscreen mode Exit fullscreen mode

That's it, all it takes as again we're using the Spring Boot defaults which assumes String for the Value of message and we're omitting the key as it's not mandatory, we will discuss how the key is used by Kafka in future articles.

Write an endpoint

Let's now create a simple endpoint which will receive a text message and publish it to Kafka, we're always returning 200 OK for now for simplicity of an example.

import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/api")
class MessageApi(private val kotlinProducer: KotlinProducer) {
  @PostMapping("/message")
  fun publish(@RequestBody message: String) {
    kotlinProducer.send(message)
  }
}
Enter fullscreen mode Exit fullscreen mode

Again a simple piece of code relying on Spring Boot defaults and using it's annotations. We inject the producer and call it directly passing in any messages sent using the POST call.

Build and run the application.

using gradle:

gradle build && gradle bootRun
Enter fullscreen mode Exit fullscreen mode

using maven:

mvn clean package && mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

There's a good chance you will get and error when running the application on your development machine now, this happens because your application is running inside your normal host network and Kafka and zookeeper are running inside the "docker network".

There are some ways to solve this, the best is to pass in your development machine hostname to docker-compose when starting the containers, if you open the docker-compose file from this project it has an entry at the KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL like ${DOCKER_HOST_IP:-kafka}:9092 this tells compose to try to use the passed in hostname or Kafka by default, check the comments in the compose file to find out how to fix it and check the references section below for more details.

Send some messages with curl

Now make sure to watch the application terminal and in another terminal window let's use curl to send some messages:

curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"
Enter fullscreen mode Exit fullscreen mode

You should see the response on the same terminal where curl was executed also check the consumer processing the message and printing it to the terminal where the application is running.

spring-kafka-curl

Done

This is it, it's done. You have now created the simplest possible Spring Boot application that produces and consume messages from Kafka using Kotlin. The reason it looks so simple is that we are relying on spring-boot and spring-Kafka default configurations as mentioned before.

If you want to know more about how Spring Boot or Kafka Works please take a look at the links in the next session where you'll find some references with further details.

If you have problems to connect your kafka client from your local development machine to the broker running in docker or Kubernetes please check the section Connecting a Kafka client on the end of this other post where you'll find details on how to fix it and links explaining why it happens and sort this out completely, have fun.

We're going to cover testing your consumer and producers in another post. Happy Coding.

References

Source code with the application created in this post.

To set up your environment with java, maven, docker and docker-compose, please check how to set up your environment for the example tutorials.

If you need some quick introduction to Kafka: Kafka - Crash Course

For some insights on how to use docker-compose for local development, please check this post: One to run them all where you will also learn some useful Kafka commands.

If you're new to Spring Boot, please take a look at Spring Boot - Crash Course

Docker compose environment variables to understand the configuration for the Kafka Advertise Listener.

Top comments (2)

Collapse
 
kennydigital profile image
Kenny Digital

Marcos, great article! I think there's a typo in the docker-compose.yaml. Build failed with topic simple-message. Should be simple-message-topic. Right?

Collapse
 
thegroo profile image
Marcos Maia

Sorry, took me so long to see this. Yes, I just fixed. Thanks.