Step 1: Create a New Spring Boot Starter Project
creating a new Spring Boot Starter Project using STS. While configuring the project, select Spring Web, Spring for Apache Kafka, and Spring Boot DevTools as dependencies
Step 2: Enable Kafka in the Main Class
To integrate Apache Kafka with Spring Boot,
package com.dev.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
@EnableKafka
public class SpringBoot2ApacheKafkaTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBoot2ApacheKafkaTestApplication.class, args);
}
}
Step 3: Create a Custom MessageRepository Class
Next, create a MessageRepository class to store incoming messages.
package com.dev.spring.kafka.message.repository;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class MessageRepository {
private List<String> list = new ArrayList<>();
public void addMessage(String message) {
list.add(message);
}
public String getAllMessages() {
return list.toString();
}
}
Step 4: Create a MessageProducer Class
Create a MessageProducer class to send messages to the Kafka topic.
package com.dev.spring.kafka.sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private Logger log = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${myapp.kafka.topic}")
private String topic;
public void sendMessage(String message) {
log.info("MESSAGE SENT FROM PRODUCER END -> " + message);
kafkaTemplate.send(topic, message);
}
}
Step 5: Create a MessageConsumer Class
Now, create a MessageConsumer class to consume messages from the Kafka topic.
package com.dev.spring.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.dev.spring.kafka.message.repository.MessageRepository;
@Component
public class MessageConsumer {
private Logger log = LoggerFactory.getLogger(MessageConsumer.class);
@Autowired
private MessageRepository messageRepo;
@KafkaListener(topics = "${myapp.kafka.topic}", groupId = "xyz")
public void consume(String message) {
log.info("MESSAGE RECEIVED AT CONSUMER END -> " + message);
messageRepo.addMessage(message);
}
}
Step 6: Create a KafkaRestController Class
Finally, create a KafkaRestController class to handle REST requests for sending and retrieving messages.
package com.dev.spring.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dev.spring.kafka.message.repository.MessageRepository;
import com.dev.spring.kafka.sender.MessageProducer;
@RestController
public class KafkaRestController {
@Autowired
private MessageProducer producer;
@Autowired
private MessageRepository messageRepo;
// Send message to Kafka
@GetMapping("/send")
public String sendMsg(@RequestParam("msg") String message) {
producer.sendMessage(message);
return "'" + message + "' sent successfully!";
}
// Read all messages
@GetMapping("/getAll")
public String getAllMessages() {
return messageRepo.getAllMessages();
}
}
Step 7: Create the application.yml File
Lastly, configure your application by creating an application.yml
server:
port: 9090
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
myapp:
kafka:
topic: myKafkaTest
Top comments (0)