Data streaming is nothing but a flow of data from one end to another. Normally, there will be a producer who sends some event information to consumers via message queue/brokers. Here, we are going to create a data streaming framework using .NET core 6 which acts as producer and consumers while Kafka will act as message broker.
Before starting to create the producer and consumers, we need to create a Kafka broker. I'm using docker and docker-compose to create a Kafka in my localhost.
Here is the docker-compose.yml file.
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka_net
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
restart: on-failure
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
- kafka_net
networks:
kafka_net:
driver: "bridge"
Running this script with 'docker-compose -d up' will create the Kafka and ZooKeeper service in my local host.
This post is not about the docker, so I'm moving to the actual part.
Kafka Producer
Kafka producer is a .NET core 6 console application with C#10.
We need the Nuget package Confluent.Kafka to use Kafka in .NET client application. Hence, we are installing this via Nuget package manager.
As one of the improvements in C#10, we can use 'global using' throughout the application. We are now creating the global.cs file with default.
global using System.Text;
global using static System.Console;
global using System.Text.Json;
global using Confluent.Kafka;
Next, we are going to create a Message class. This is the data class which will be serialized and published to the Kafka in this example. This is a simple class containing 3 public properties namely Id, Data and Timestamp.
namespace KafkaProducer;
public class Message
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string? Data { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public override string ToString()
{
return JsonSerializer.Serialize(this);
}
}
The ID is an auto generated GUID and the timestamp will be the current time when the Message object is created.
Now, we are going to create a Custom Serializer class. The reason for this class is that Kafka implicitly supports some predefined datatypes to serialize and deserialize in it which are,
- Null
- Ignore
- Int32
- Int64
- Utf8
- Single
- Double
- ByteArray
However, if we need to publish/consume different data from these, we need to tell Kafka which serializer/deserializer object it needs to use.
namespace KafkaProducer
{
public class CustomValueSerializer<T> : **ISerializer<T>**
{
public byte[] Serialize(T data, SerializationContext context)
{
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data, typeof(T)));
}
}
}
As you can see, the CustomValueSerializer is implementing the ISerialize interface from Kafka which has a single method implemented called "Serialize". This will be used by Kafka while publishing this message.
Now, we are going to create a producer class which publishes the message to Kafka asynchronously.
namespace KafkaProducer;
public class Producer<T>
{
readonly string? _host;
readonly int _port;
readonly string? _topic;
public Producer()
{
_host = "localhost";
_port = 9092;
_topic = "producer_logs";
}
ProducerConfig GetProducerConfig()
{
return new ProducerConfig
{
BootstrapServers = $"{_host}:{_port}"
};
}
public async Task ProduceAsync(T data)
{
using (var producer = new ProducerBuilder<Null, T>(GetProducerConfig())
.SetValueSerializer(new CustomValueSerializer<T>())
.Build())
{
await producer.ProduceAsync(_topic, new Message<Null, T> { Value = data });
WriteLine($"{data} published");
}
}
}
This class has a public method "ProduceAsync" which gets the message information and publishes it to Kafka. If you notice, while creating the ProducerBuilder object, we are letting Kafka know that the value serializer for this producer is "CustomValueSerializer" by using an api .SetValueSerializer(new CustomValueSerializer())
Now, from the program.cs, we are creating an instance of producer class and invoke the produceasync method to publish our data to Kafka.
namespace KafkaProducer;
public class Program
{
public static async Task Main(string[] args)
{
var producer = new Producer<Message>();
for(int i=0; i< 25 ; i++)
{
await producer.ProduceAsync(new Message
{
Data = $"Pushing Data {i} !!",
});
await Task.Delay(1000);
}
WriteLine("Publish Success!");
ReadKey();
}
}
If you run this, the application will publish 25 times to Kafka.
Kafka Consumer
Kafka consumer is again a .NET core 6 console application with C#10. We need the same Nuget package Confluent.Kafka here too.
After installing that, we will create a global.cs file to add global usings like what we did for producer.
Once global file is added, we will create a Message class.
namespace KafkaConsumer;
public class Message
{
public string? Id { get; set; }
public string? Data { get; set; }
public DateTime Timestamp { get; set; }
public override string ToString()
{
return JsonSerializer.Serialize(this);
}
}
Next, we are going to create Custom serializer class that will be used by Kafka while serializing the published message.
namespace KafkaConsumer
{
public class CustomValueDeserializer<T> : IDeserializer<T>
{
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return JsonSerializer.Deserialize<T>(data);
}
}
}
Now, we are going to create a generic Consumer class to start consuming the Kafka topic.
namespace KafkaConsumer;
public class Consumer<T>
{
readonly string? _host;
readonly int _port;
readonly string? _topic;
public Consumer()
{
_host = "localhost";
_port = 9092;
_topic = "producer_logs";
}
ConsumerConfig GetConsumerConfig()
{
return new ConsumerConfig
{
BootstrapServers = $"{_host}:{_port}",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
}
public async Task ConsumeAsync()
{
using (var consumer = new ConsumerBuilder<Ignore, T>(GetConsumerConfig())
.SetValueDeserializer(new CustomValueDeserializer<T>())
.Build())
{
consumer.Subscribe(_topic);
WriteLine($"Subscribed to {_topic}");
await Task.Run(() =>
{
while (true)
{
var consumeResult = consumer.Consume(default(CancellationToken));
if(consumeResult?.Message?.Value is Message result)
{
WriteLine($"Data Received - {result}");
}
}
});
consumer.Close();
}
}
}
As you can see, this class contains a method "ConsumeAsync" which creates a consumer object and starts consuming from Kafka using "consumer.consume()" method. This is not an async method, so if you call this method in a main thread, it will be blocked until you receive a message here which is why I have consumed it in a different task. Once a message is received, we are printing it here.
Now the main program looks like,
namespace KafkaConsumer;
public class Program
{
public static async Task Main(string[] args)
{
var consumer = new Consumer<Message>();
await consumer.ConsumeAsync();
}
}
When you run both the producer and consumer now,
The original source is available in my github repo
Top comments (0)