Event-Driven Architecture is the asynchronous, dynamic processing of events to communicate between decoupled services.
For example, we might have a unified communication platform that manages millions of users and conversations. We might bring in these conversations from a multitude of sources and process them in many different ways.
Event-Driven Architecture enables us to distribute processing of these events between various services which might:
- Transfer information to long-term storage like a data lake
- Perform administrative tasks like stopping spammers in real-time.
Events can come from many different channels. In this guide, you will build a Spring Boot application, using Kotlin, that can receive events from the Vonage Communication APIs and store them in Apache Kafka.
This guide assumes that you have some experience with Spring Boot and the Vonage Communication APIs.
An example of the code can be found on GitHub: cr0wst/vonage-events-kafka
What are the Vonage Communication APIs?
The Vonage Communication APIs enable developers to create things like in-app messaging, voice chat, and integration with third-party chat services.
Many of these actions generate Events, which can be handled by your application via a pre-configured Webhook.
I have created a Demo Application that can generate a random conversation to produce events.
What is Kafka?
Apache Kafka is a horizontally scalable, fault-tolerant tool for building real-time data pipelines and streaming applications.
Kafka allows you to publish and subscribe to topics, which serve as a category for records to be published. A topic can have multiple publishers and consumers. Unlike traditional message brokers, it can store historical data indefinitely.
You can read more about Kafka on the Confluent Docs.
This guide will cover the following concepts:
- Connecting a Spring Boot Application to Kafka.
- Creating a controller to serve as a Webhook for incoming Vonage Communication events.
- Pipelining these events into a Kafka topic for later processing.
Download the Demo Application
The Pre-Setup instructions will guide you through creating a Vonage Application. You can also follow the official documentation for Creating an Application.
Most of this guide assumes that you are already receiving webhook events from Vonage, but you aren't currently sending them to Kafka.
If you don't have an application, I have created a Demo Application, which simulates multiple conversations using the Vonage Conversation API.
Initialize a New Spring Boot Application
Use your IDE or the Spring Initializr to create a new Spring Boot Application with Spring For Apache Kafka and Spring Reactive Web:
Create a Webhook
The Vonage Communication APIs send event messages to a pre-configured webhook.
Configure the Event URL
The Event URL can be defined using the Vonage Dashboard
This has to be a publicly-accessible URL. You can use a tool like ngrok to safely tunnel traffic to your application. Check out this post on Working with Webhooks using Ngrok.
I'm going to start a tunnel using Ngrok to listen on port 8080:
ngrok http 8080 -subdomain=cr0wst
Create a Controller
Create a new class called EventWebhookController
and add the following code:
package com.smcrow.demo
import org.slf4j.LoggerFactory
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
@RestController
class EventWebhookController {
@PostMapping("/webhooks/events")
fun incomingEvents(@RequestBody body: String) {
// Handle the Event Here
log.info(body)
}
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
}
This code registers a route at /webhooks/events
that accepts POST
requests and logs the request body.
If you were to start both applications, you would see a stream of Vonage events:
Start the Webhook Application:
./mvnw spring-boot:run
Now, start the demo application:
cd ~/Code/vonage-conversation-demo
./mvnw spring-boot:run
The top terminal shows the events from Vonage, and the bottom is the application generating random conversations and messages:
Now that your application is receiving events from Vonage, it's time to set up Kafka.
Setting up Kafka
If you don't already have access to a Kafka cluster, there are a few solutions.
The simplest way is to sign up for a service like Confluent Cloud. I will be using Confluent for the remainder of this guide and the Confluent Cloud CLI
Create a Cluster
Create a new cluster on Confluent Cloud with the following command:
ccloud kafka cluster create demo --cloud aws --region us-east-2
The output has an ID number and an Endpoint, record these for later. For this guide, I will be using the ID lkc-03wkq
and endpoint pkc-ep9mm.us-east-2.aws.confluent.cloud:9092
Create a Topic
Create the vonage.webhook.events
topic inside of your Kafka cluster for storing the Vonage webhook events:
ccloud kafka topic create --cluster lkc-03wkq --partitions 1 vonage.webhook.events
Connecting a Spring Boot Application to Kafka
To connect your Spring Boot Application to Confluent Cloud, you'll need to create an API Key and Secret. Run the following command, replacing the resource with your ID from a previous step.
ccloud api-key create --resource=lsrc-7qz91
You will also need your bootstrap server address, which is the endpoint from a previous step.
Using the bootstrap server address, API Key and Secret, add the following properties to application.properties
, replacing the appropriate variables.
# Kafka
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers={{ BOOTSTRAP_SERVER_ADDRESS }}
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{{ CLUSTER_API_KEY }}" password="{{ CLUSTER_API_SECRET }}";
spring.kafka.properties.security.protocol=SASL_SSL
# Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
For example:
# Kafka
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers=pkc-ep9mm.us-east-2.aws.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="my-api-key" password="my-api-secret";
spring.kafka.properties.security.protocol=SASL_SSL
# Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Produce Messages from Spring Boot
The Spring Kafka project uses a KafkaTemplate
object to produce messages. KafkaTemplate
is generic with two generic types: one for the Kafka key, and one for the value.
Update the EventWebhookController
to inject a KafkaTemplate<String, String>
as a dependency:
package com.smcrow.demo
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
@RestController
class EventWebhookController(private val kafkaTemplate: KafkaTemplate<String, String>) {
@PostMapping("/webhooks/events")
fun incomingEvents(@RequestBody body: String) {
// Handle the Event Here
log.info(body)
}
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
}
Now, update the incomingEvents
method to send the event to Kafka:
package com.smcrow.demo
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
@RestController
class EventWebhookController(private val kafkaTemplate: KafkaTemplate<String, String>) {
@PostMapping("/webhooks/events")
fun incomingEvents(@RequestBody body: String) {
// Handle the Event Here
kafkaTemplate.send("vonage.webhook.events", body)
}
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
}
Consuming Kafka Messages Using the CLI
You can use the Confluent Cloud CLI to consume events on the vonage.webhook.events
topic.
First, configure the CLI to use your API key from before, replacing my-api-key
with your API key and lkc-03wkq
with the ID from a previous step:
ccloud api-key use my-api-key --resource lkc-03wkq
Now, start consuming messages:
ccloud kafka topic consume --cluster lkc-03wkq -b vonage.webhook.events
Remember that Kafka is a commit log. The
-b
flag starts consumption from the beginning. Using this flag means you'll get a back-fill of messages that you missed while you weren't connected.
Start Everything
First, start the Kafka Consumer if you haven't already:
ccloud kafka topic consume --cluster lkc-03wkq -b vonage.webhook.events
Now start the Webhook Application:
cd ~/Code/vonage-events-kafka-demo
./mvnw spring-boot:start
Finally, start the conversation simulator application:
cd ~/Code/vonage-conversation-demo
./mvnw spring-boot:run
The following shows all three running:
Conclusion
In this guide, you created a Spring Boot Application that can listen to Vonage Events and publish them to Kafka.
An example of the code can be found on GitHub: cr0wst/vonage-events-kafka
In another guide, I'll show you how to consume these events and store them in a relational database like Snowflake.
Here are some links you might find useful:
Top comments (0)