Kafka?
To understand the purpose of Kafka you need to experiment with it in various contexts and reason with it. This small project is a very slight overview of one of the use cases of Kafka: streaming messages to drive multiple service-agnostic applications. Streaming messages using Kafka is not required to specify the destination where these messages should land. Similarly, it is not relevant for the receiver to be aware of the sender in any context unless done manually.
Kafka assures the integrity of the messages in the stream in the face of system failure saving us from inconsistent data, data loss, or redundancy. It is much more powerful when used across systems and applications built upon different tech stacks, architecture, and purposes and is distributed over the network. To read more about it check out this eBook.
So to get a basic understanding of what Kafka is and how it operates, we will be creating two different applications and we will see how they can communicate with each other without making any HTTP request internally.
Event-driven services?
A service is a logical unit within a larger application, responsible for executing specific tasks based on given inputs. Each service can communicate with other services using either synchronous communication methods (such as HTTP/REST, gRPC, and GraphQL) or asynchronous communication methods (such as webhooks, message queues, and event streaming). In this article, we will focus on using an event-streaming communication pattern to coordinate various services and perform the necessary operations to achieve the desired outcomes.
Event-driven services are those that communicate with each other in response to specific events being triggered. This communication is independent of the sources generating the events and the entities reacting to them.
Overview
To simulate a real-life example we will be building an inventory service using Rust and Actix that should receive an HTTP request to check if the product with the required units is available in the inventory, if it exists then we take that many products from the whole stock of it and when the application has successfully finished its job then we will produce a message using Kafka.
Also to keep store the data of the products and their available units we will be using a database called SurrealDB. I have chosen SurrealDB because of a specific reason which we will explore later in this article.
Now that we have produced a message from the inventory we need a consumer to consume this message by connecting to the Kafka broker. So for this, we will create a shipment service using Go to simulate the shipping process when the products are released from the inventory but to keep this project short and concise we are not going to build the whole shipment system.
Prerequisites
Basic understanding of programming and a huge amount of curiosity to know the whys and hows. Nothing else.
If you want to refer to the source code while reading this article then here is the repository
Get started
Docker compose
We will be pulling the docker images of Kafka, Zookeeper, and SurrealDB to keep the setup process light and easy.
version: "3.8"
services:
surrealdb:
image: surrealdb/surrealdb:latest
container_name: surrealdb
command: start --auth --user root --pass root file:/container-dir/dev.db
ports:
- "8000:8000"
volumes:
- surrealdb-data:/container-dir
user: root
environment:
- SURREALDB_ENV_USER=root
- SURREALDB_ENV_PASS=root
networks:
- net
zookeeper-1:
container_name: zookeeper-1
image: zookeeper
restart: always
ports:
- 2181:2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
volumes:
- ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties
kafka-1:
container_name: kafka-1
image: bitnami/kafka
restart: always
depends_on:
- zookeeper-1
ports:
- 9092:9092
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper-1:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CREATE_TOPICS=stock_update:1:3
healthcheck:
test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server kafka:9092 --list || exit 1"]
interval: 5s
timeout: 10s
retries: 5
networks:
net:
name: "net"
driver: bridge
volumes:
surrealdb-data:
Upon execution of the compose file using this command the same directory:
docker compose up -d
the docker engine will start three different containers meant for each image and now you will have Kafka at port 9092
and SurrealDB at port 8000
running in your local machine.
Inventory service
Create a new Rust binary application and you can name it inventory_service
. Add these dependencies to your Cargo.toml
file:
actix-web = "4"
dotenv = "0.15.0"
futures-util = "0.3"
rdkafka = { version = "0.36", features = ["ssl-vendored"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
surrealdb = "1.5.1"
tokio = { version = "1", features = ["full", "macros", "rt-multi-thread"] }
To install them run this command:
cargo install
Create a new .env
file in the same path, and add these variables:
SURREAL_URL=127.0.0.1:8000
SURREAL_DB=ecommerce
SURREAL_NS=foo
SURREAL_USER=root
SURREAL_PW=root
KAFKA_BROKER=localhost:9092
KAFKA_TOPIC=stock_update
If you have changed any of these values then make sure to change them in your
.env
file.
Now being in the root path create a file init.surql
which will have all the queries to set up the inventory
table, inventory_stock_events
table, and seed some dummy products. Check this file here to get the queries you need to add. And once that is done run this command to execute all the statements by importing them using SurrealDB CLI
surreal import --conn http://localhost:8000 --user root --pass root --ns foo --db ecommerce ./ini
t.surql
One important note, we have created inventory_stock_events
table that will record the logs for the events whenever there is a change in units for any of the products. This logging happens autonomously so that we donβt have to trigger the logging process manually.
Now let's create the schema of the tables inside the src/
directory. Ad this schema to your schema.rs
file:
use serde::{Deserialize, Serialize};
use surrealdb::sql::{Datetime, Id};
#[allow(dead_code)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ProductThing {
id: Id,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Product {
pub id: ProductThing,
pub name: String,
pub price: u16,
pub units: u16,
}
#[derive(Debug, Deserialize)]
pub struct UpdateProductStock {
pub units: u16,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize, Serialize)]
pub struct EventThing {
id: Id,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct StockEvent {
pub id: EventThing,
pub time: Datetime,
pub action: String,
pub product: ProductThing,
pub before_update: u16,
pub after_update: u16,
}
Create a kafka.rs
file and add the Kafka producer function. This can be used with different topics and brokers:
use rdkafka::{producer::FutureProducer, ClientConfig};
pub async fn producer(brokers: &str) -> FutureProducer {
ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.set("allow.auto.create.topics", "true")
.create()
.expect("Producer creation error")
}
Now that we have the base setup let's create handler functions in main.rs
:
- Get inventory products handler
This handler function will return us the list of products and their related attributes.
async fn get_inventory_products(state: web::Data<State>) -> impl Responder {
let db = &state.db;
let products: Vec<Product> = match db.select("inventory").await {
Ok(val) => val,
Err(e) => {
dbg!(e);
return HttpResponse::InternalServerError().body("Server problems!!");
}
};
HttpResponse::Ok().json(products)
}
- Update stock handler:
This handler function takes the product_id
and units
in the payload. It will check whether there is a product with this product_id
, and units
exist or not. And updates the stock units accordingly responding with appropriate messages and status
async fn update_stock(
product_id: web::Path<String>,
state: web::Data<State>,
payload: web::Json<UpdateProductStock>,
) -> impl Responder {
if product_id.is_empty() {
return HttpResponse::BadRequest().body("Invalid Product Id");
}
let db = &state.db;
let mut available_units: u16 = 0;
if let Ok(mut query_product) = db
.query(format!(
"SELECT units FROM inventory:{} WHERE units>={}",
product_id, payload.units
))
.await
{
if let Ok(Value::Array(arr)) = query_product.take(0) {
if !arr.is_empty() {
if let Value::Object(obj) = &arr[0] {
if let Some(Value::Number(units)) = obj.get("units") {
available_units = units.to_usize() as u16;
}
}
} else {
return HttpResponse::NotFound().body("Product not found or insufficient units");
}
} else {
return HttpResponse::InternalServerError().body("Unexpected query response format");
}
} else {
return HttpResponse::InternalServerError().body("Server Error");
}
if let Ok(mut update_product) = db
.query(format!(
"UPDATE inventory:{} SET units={}",
product_id,
available_units - payload.units,
))
.await
{
if let Ok(Value::Array(arr)) = update_product.take(0) {
if !arr.is_empty() {
HttpResponse::Ok().body("Product stock updated")
} else {
HttpResponse::NotFound().body("Product not found or insufficient units")
}
} else {
HttpResponse::InternalServerError().body("Unexpected query response format")
}
} else {
HttpResponse::InternalServerError().body("Server Error")
}
}
- Stream stock change event handler
async fn stream_stock_changes(
db: &Surreal<Client>,
stock_producer: &FutureProducer,
) -> surrealdb::Result<()> {
let kafka_topic = env::var("KAFKA_TOPIC").expect("KAFKA_TOPIC must be set.");
if let Ok(mut stream) = db.select("inventory_stock_events").live().await {
while let Some(result) = stream.next().await {
let res: Result<Notification<StockEvent>> = result;
let data = &res.unwrap().data;
stock_producer
.send(
FutureRecord::to(&kafka_topic)
.payload(&format!(
"Message {}",
&serde_json::to_string(data).unwrap()
))
.key(&format!("Key {}", 1))
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
)
.await
.expect("FAILED TO PRODUCE THE MESSAGE");
}
} else {
println!("Failed to stream")
}
Ok(())
}
In this handler function, we have leveraged the power of the live query select statement of SurrealDB. Using live query we can capture real-time data changes in the inventory_stock_events
table without any fail or manual trigger.
Hereβs how it operates:
When the user makes a change to stock units of any valid product in the inventory table it gets logged by the inventory event log table and when the logging of the record change is successful the application will automatically trigger an event, and when that happens a message with specific changes will be produced and streamed by Kafka broker.
Using all the handler functions we can update the main function to this:
async fn main() -> std::io::Result<()> {
dotenv().ok();
// LOAD ENV VARS
let surreal_url = env::var("SURREAL_URL").expect("SURREAL_URL must be set.");
let surreal_ns = env::var("SURREAL_NS").expect("SURREAL_NS must be set.");
let surreal_db = env::var("SURREAL_DB").expect("SURREAL_DB must be set.");
let surreal_user = env::var("SURREAL_USER").expect("SURREAL_USER must be set.");
let surreal_password = env::var("SURREAL_PW").expect("SURREAL_PW must be set.");
let kafka_broker = env::var("KAFKA_BROKER").expect("KAFKA_BROKER must be set.");
// INIT DATABASE
let db = Surreal::new::<Ws>(&surreal_url)
.await
.expect("Failed to connect to the Surreal client");
db.signin(Root {
username: &surreal_user,
password: &surreal_password,
})
.await
.expect("Failed to authenticate");
db.use_ns(surreal_ns)
.use_db(surreal_db)
.await
.expect("Failed to access the Database");
let db_clone = db.clone();
// CREATE KAFKA PRODUCER
let stock_producer = producer(&kafka_broker).await;
// SPAWN A NEW THREAD TO EXECUTE KAFKA PRODUCER
task::spawn(async move {
stream_stock_changes(&db_clone, &stock_producer)
.await
.expect("failed to stream");
});
// EXECUTE SERVER
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(State { db: db.to_owned() }))
.service(
web::scope("/inventory")
.service(web::resource("").route(web::get().to(get_inventory_products)))
.service(
web::scope("/{product_id}")
.service(web::resource("").route(web::patch().to(update_stock))),
),
)
})
.bind(("127.0.0.1", 3000))?
.run()
.await
}
Finally, your main.rs
file should look like this here.
Shipping service
This service is not that functional it just consumes the messages by connecting to the Kafka broker. I have kept this application as simple as possible for this article but in real projects, you would be doing much more logical operations with the messages that this application receives.
Create a directory shipment_service
, inside the directory and create a new Go binary:
go mod init shipment_service
Install packages:
go get -u github.com/segmentio/kafka-go
go get -u github.com/joho/godotenv
Create .env
File
KAFKA_BROKER=localhost:9092
KAFKA_TOPIC=stock_update
KAFKA_GROUP_ID=shipment_service_group
POSTGRES_USER=yourusername
POSTGRES_PASSWORD=yourpassword
POSTGRES_DB=yourdatabase
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
If you have changed any of these values then make sure to change them in your
.env
file.
Create a main.go
file and add the following code:
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/joho/godotenv"
"github.com/segmentio/kafka-go"
)
type OriginalMessage struct {
ID IDWrapper `json:"id"`
Time string `json:"time"`
Action string `json:"action"`
Product IDWrapper `json:"product"`
BeforeUpdate int `json:"before_update"`
AfterUpdate int `json:"after_update"`
}
type IDWrapper struct {
ID IDStringWrapper `json:"id"`
}
type IDStringWrapper struct {
String string `json:"String"`
}
type TransformedMessage struct {
ID string `json:"id"`
ProductID string `json:"productId"`
BeforeUpdate int `json:"before_update"`
AfterUpdate int `json:"after_update"`
Action string `json:"action"`
Time string `json:"time"`
}
func main() {
// Load environment variables
err := godotenv.Load()
if err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
// Kafka configuration
kafkaBroker := os.Getenv("KAFKA_BROKER")
kafkaTopic := os.Getenv("KAFKA_TOPIC")
kafkaGroupID := os.Getenv("KAFKA_GROUP_ID")
// Set up Kafka reader
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
GroupID: kafkaGroupID,
Topic: kafkaTopic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
// Create a channel to handle OS signals
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM)
// Start consuming messages
go func() {
for {
m, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("Error fetching message: %v", err)
continue
}
rawMessage := string(m.Value)
jsonMessage := strings.TrimPrefix(rawMessage, "Message ")
var originalMessage OriginalMessage
if err := json.Unmarshal([]byte(jsonMessage), &originalMessage); err != nil {
log.Printf("Error unmarshaling message: %v", err)
continue
}
transformedMessage := transformMessage(originalMessage)
transformedMessageJSON, err := json.Marshal(transformedMessage)
if err != nil {
log.Printf("Error marshaling transformed message: %v", err)
continue
}
log.Printf("Incoming message: %s", transformedMessageJSON)
// Commit the message to mark it as processed
if err := reader.CommitMessages(context.Background(), m); err != nil {
log.Printf("Error committing message: %v", err)
}
}
}()
// Wait for a termination signal
<-sigchan
log.Println("Shutting down...")
reader.Close()
}
func transformMessage(orig OriginalMessage) TransformedMessage {
return TransformedMessage{
ID: orig.ID.ID.String,
ProductID: orig.Product.ID.String,
BeforeUpdate: orig.BeforeUpdate,
AfterUpdate: orig.AfterUpdate,
Action: orig.Action,
Time: orig.Time,
}
}
Running the services
Go inside the inventory_service
directory and run the binary:
cargo run
Open another terminal window, go inside the shipment_service
directory and run the binary:
go run main.go
Execution
Now on a new terminal window execute this CURL request:
curl --location --globoff --request PATCH 'http://localhost:3000/inventory/{product_id}' \
--header 'Content-Type: application/json' \
--data '{
"units":2
}'
You will notice that messages are getting printed on the terminal window where you are running your Go binary(shipment service). It is consuming the changes in stock in real time without any HTTP communication between the services.
Final Output
Conclusion
I hope you enjoyed reading this article and learned something new. Though I have tried to explain one of the use cases of Kafka and the features of SurrealDB, there is so much more that can be achieved and a highly efficient application can built using tools and technologies like this that cannot be covered in one single article.
Signing Off!!
Top comments (2)
thanks for taking the time to write this polygot streaming setup
It's a pleasure. Glad that you liked it