Hi, I'm Ricardo Medeiros, .NET back end developer @vaivoa, and today I'm going to walk you through using ksqlDB to query messages produced in kafka by a .NET/C# producer. For this example, I will be deploying my enviroment as containers, described in a docker compose file, to ensure easy reproducibility of my results.
The source code used in this example is avaliable here.
Services
First, let's talk about the docker compose environment services. the file is avaliable here.
.NET API Producer
Automaticaly generated .NET api with docker compose service
ksqldbdemo:
container_name: ksqldbdemo
image: ${DOCKER_REGISTRY-}ksqldbdemo
build:
context: .
dockerfile: Dockerfile
This producer service needs the .NET generated dockerfile shown below:
FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build
WORKDIR /src
COPY ["ksqlDBDemo.csproj", "."]
RUN dotnet restore "ksqlDBDemo.csproj"
COPY . .
WORKDIR "/src/"
RUN dotnet build "ksqlDBDemo.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "ksqlDBDemo.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "ksqlDBDemo.dll"]
ZooKeeper
Despite not been necessary since Kafka 2.8, ZooKeeper coordinates kafka tasks, defining controllers, cluster membership, topic configuration and more. In this tutorial, it's used the confluent inc. ZooKeeper image, due to it's use in the reference material. It makes Kafka more reliable, but adds complexity into the system.
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
Kafka
Kafka is an event streaming plataform capable of handling trillions of events a day. Kafka is based on the abstraction of an distributed commit log. Initialiy developed at LinkedIn in 2011 to work as a message queue, but it has evolved into a full-fledge event streanming platfmorm. Listed as broker in the services, is the core of this tutorial. It's configuration is tricky, but using it as follows worked well in this scenario.
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqlDB
ksqlDB is a database built to allow distributed stream process applications. Made to work seamsly with kafka, it has a server that runs outside of kafka, with a REST API and a CLI application that can be run separatly and it's used in this tutorial.
ksqlDB Server
In this example, it's used the confluent inc image of the ksqlDB server, once more, due to it's widespread usage.
ksqldb-server:
image: confluentinc/ksqldb-server:0.22.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqlDB CLI
The same goes for the ksqlDB CLI service, that also use the confluent inc image.
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.22.0
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
Kafdrop
Kafdrop is a Web UI for viewing kafka topics and browsing consumer groups. It makes kafka more accessible.
kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop:latest
depends_on:
- broker
ports:
- 19000:9000
environment:
KAFKA_BROKERCONNECT: broker:29092
Tutorial
Now it's the time that you have been waiting, let's make it work!
Enviroment
For this tutorial, you'll need a docker desktop installation, either it's on a Linux distribution or on Windows with WSL and git.
Cloning the project
A Visual Studio project is avaliable here, it has docker support and already deploys all the services needed for this demo in the IDE. However, you will be fine if you don't want or can't use Visual Studio. Just clone it, running the following comand on the terminal and directory of your preference:
$ git clone https://github.com/jjackbauer/ksqlDBDemo.git
Use the following command to move to the project folder:
$ cd /ksqlDBDemo
And, in the project folder, that contains the docker-compose.yml run the following command to deploy the services:
$ docker compose up -d
after this command, make sure that all services are running. Sometimes services fall, but it is okay. In order to see if everything is running ok, it's possible to see the services running in docker desktop, as shown bellow:
Or you can execute the following command:
$ docker ps
Which should output something like this:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
b42ce9954fd9 ksqldbdemo_ksqldbdemo "dotnet ksqlDBDemo.d…" 2 hours ago Up 2 hours 0.0.0.0:9009->80/tcp, 0.0.0.0:52351->443/tcp ksqldbdemo
0a0186712553 confluentinc/ksqldb-cli:0.22.0 "/bin/sh" 2 hours ago Up 2 hours
ksqldb-cli
76519de6946e obsidiandynamics/kafdrop:latest "/kafdrop.sh" 2 hours ago Up 2 hours 0.0.0.0:19000->9000/tcp
kafdrop
11c3a306ee01 confluentinc/ksqldb-server:0.22.0 "/usr/bin/docker/run" 2 hours ago Up 2 hours 0.0.0.0:8088->8088/tcp
ksqldb-server
07cef9d69267 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 2 hours ago Up 2 hours 9092/tcp, 0.0.0.0:29092->29092/tcp
broker
3fa1b9a60954 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 2 hours ago Up 2 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
WEB API
Now, with all services up and running, we can access the WEB API Swagger to populate our Kafka topics. The code is very simple and it's avaliable in the repository.
The WEB API swagger is deployed at http://localhost:9009/swagger/index.html. As shown in the image bellow, it has two endpoints and they create events that could be created by indepent microservices. One for creating an event that creates a userName in the system and another that takes an Id and generates a three digit code.
Then you can create an User with the user name of your choise, as shown:
And it will have an assigned unique Id, as demonstrated:
Now, you can get a three digit code for your user Id as displayed:
And a random code is generated for the selectd, as we can observe in the image that follows:
Kafdrop
We can use the kafdrop UI the check if everything is okay. Kafdrop is deployed at http://localhost:19000/.
There, you will find all the brokers and topics avaliable. It should look like this:
KSQL CLI
After all that, you'll be able to create your streams of data and query it using ksqlDB. On your preferential terminal, use the command:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Creating streams
And then you are in the ksql CLI and are free to create your streams and queries. First, let's create a stream for each one of our topics:
CREATE STREAM stream_user (Name VARCHAR, Id VARCHAR)
WITH (kafka_topic='demo-user', value_format='json', partitions=1);
CREATE STREAM stream_code (Id VARCHAR, code INT)
WITH (kafka_topic='demo-code', value_format='json', partitions=1);
Create a materialized view
You can join the client data with the most recent randomized code. to achieve this, you must create a materialized view table, that joins both streams as seen in the ksqldb script that follows:
CREATE TABLE currentCodeView AS
> SELECT user.Name,
> LATEST_BY_OFFSET(code.code) AS CurrentCode
> FROM stream_code code INNER JOIN stream_user user
> WITHIN 7 DAYS ON code.Id = user.Id
> GROUP BY user.Name
>EMIT CHANGES;
Making a push query
After that, we can query this materialized view:
SELECT * FROM currentCodeView
EMIT CHANGES;
This push query keep on running until you hit cntrl+c to cancel it.
Conclusions
In this tutorial it's demonstrated that in a kafka + ksqlDB enviroment, you can make SQL queries and also join on data that comes from different events, which is one of most complexities envolved with microsservices systems. And it is what ksqlDB solves by enabling SQL operations over Kafka topics.
It's my goal to explore the possibilites allowed by this ecosystem and I hope to bring more knowledge on this topic in another articles here. Any sugestions, comments or corrections, fell free to reach me out at LinkedIn.
References
ksqlDB Quickstart
ksqlDB Overview
Kafka .NET Client
ksqlDB Documentation - Data Types Overview
KSQL and ksqlDB
Welcome to Apache ZooKeeper
What is ZooKeeper & How Does it Support Kafka?
What is Apache Kafka®?
ksqlDB - The database purpose-built for stream processing applications
An overview of ksqlDB
CREATE TABLE AS SELECT
How to join a stream and a stream
Time and Windows in ksqlDB Queries
Time operations
Top comments (0)