C# Channels
The .NET Core 3.0 has brought a new namespace System.Threading.Channels
which contains a synchronization data structure crucial for concurrent programming. In this article I will briefly explain what concurrency is, how is it connected to channels
, and why is concurrency in many cases an easy way to parallel computation. It is a programing paradigm that does not necessary need mutexes, semaphores and other synchronization primitives, while it does not inherently suffer from deadlocks and race conditions. Plus, it is simple to test and reason about, because the building blocks are simple single thread oriented units.
What is concurrency?
Concurrency is a style of programming popularized mainly by Go language and its creator Rob Pike. Concurrent program means that It parts (classes/functions/methods) of the program can run with no order, independently. In our case we create classes that do some work and connect them together by a thread-safe pipeline for your data called channel
. Therefore we can safely send the data between the classes and we do not have to worry about making the data structures thread-safe. The safety is guaranteed by the channels
.
Concurrency does not necessary mean parallel computation, you can run a concurrent program only in one thread. But since the parts of your program are independent and can be run in any order, it is easy to transform any concurrent program to a parallel one.
Let us have some analogy for channels. Imagine a little unorthodox bakery, where three workers work. Each of them is specialized in one task.
-
WorkerOne
can prepare dough, -
WorkerTwo
can bake bread, -
WorkerThree
can work at the checkout.
They decided to have two baskets, BasketOne
is for the dough and BasketTwo
is for the bread and they agreed to this algorithm of work.
- When
WorkerOne
is done, he puts dough into theBasketOne
, -
WorkerTwo
will take dough from the basket whenever he can and starts working, - When
WorkerTwo
is done, he puts the bread into theBasketTwo
, -
WorkerThree
will take the bread from the basket and puts it on the counter.
The baskets here represent channels, the product is safely stored in a basket until it is needed. All workers are synchronized by the baskets, if a basket is full, the worker has to wait until it is empty again. When the basket is empty and the worker has nothing to do, he must wait until the basket is filled again. But otherwise, they can work independently. And if they find out, that WorkerTwo
is a bottleneck for their production, they can always hire a worker with the same specialization and just like that they made the process of baking faster - no other change is necessary, they just need one more worker.
Code Example
Imagine you want to program a concurrent prime sieve (inspired by Go channel tutorial), to demonstrate how you can utilize channels in your C# program. First we need to identify our independent parts of the program. Let us create two classes
- Generator - generates natural numbers from 2,
- Filter - checks if a number is divisible by a prime number.
Generator class could look like this:
public class Generator
{
public readonly ChannelReader<int> Reader;
//from here other classes will get a next natural number
public Generator()
{
var channel = Channel.CreateBounded<int>(1);
//we create a channel with capacity 1
Reader = channel.Reader;
_ = Generate(channel.Writer); //runs task Generate.
}
private static async Task Generate(ChannelWriter<int> writer)
{
var i = 2;
while (true)
{
await writer.WriteAsync(i).ConfigureAwait(false);
//puts a natural number into the channel, waits if the channel is full
i++;
}
}
}
When a instance of Generator gets initialized, it will run a task on background, which
- puts a natural number i into the channel,
- increases i by one,
- waits until the channel is empty again.
Filter on the other hand could look like this:
public class Filter
{
public readonly ChannelReader<int> Reader;
//from here will other filters get a next potential prime number
public Filter(ChannelReader<int> input, int primeNumber)
{
var channel = Channel.CreateBounded<int>(1);
//we create a channel with capacity 1
Reader = channel.Reader;
_ = FilterLoop(input, channel.Writer, primeNumber); //runs task FilterLoop
}
private static async Task FilterLoop(ChannelReader<int> reader, ChannelWriter<int> writer, int primeNumber)
{
await foreach (var i in reader.ReadAllAsync().ConfigureAwait(false))
//reads a value if available, otherwise waits
{
if (i % primeNumber != 0) //checks if i is divisible by primeNumber
{
await writer.WriteAsync(i).ConfigureAwait(false);
//puts non-divisible number into the writer
}
}
}
}
When an instance of filter is created, the FilterLoop task will do this:
- reads a number from the ChannelReader,
- tries to divide it by a prime number,
- if it is divisible it waits for the next number,
- if it is not divisible, it will write it into the output channel,
- waits for the next number.
The main method then can look like this:
class Program
{
static async Task Main(string[] args)
{
var generator = new Generator();
var primeReader = generator.Reader;
for (int i = 0; i < 10; i++) //the loop will print the first ten prime numbers
{
var prime = await primeReader.ReadAsync().ConfigureAwait(false);
Console.WriteLine($"Prime number: {prime}");
var filter = new Filter(primeReader, prime);
primeReader = filter.Reader;
}
}
}
In the main loop we connect the ChannelReader
from Generator
instance with the first Filter
instance. Then we take the reader from the last filter and connect it to the new instance of Filter
. So we end up with a chain of channels, where the Generator generates natural numbers and each filter tests if a number can be prime. Here is a simple diagram to demonstrate the workflow:
First i
that passes all filters is the next prime number. Therefore we can print it and use it as a argument for a new Filter
instance. Each instance can run independently. If there is something in the input, it will process it, otherwise it will wait in background not blocking any thread.
Unbounded variant
In the example above we have worked with bounded channels, but C# also offers a unbounded variant. The API is the same, but the ChannelWriter
will not wait in WriteAsync
function - because the channel is never full. This can lead to an exception when the memory is full. I would recommend to use this channel when you know that output of generator (the part that uses ChannelWriter) is somehow limited and we are sure that
- we have enough memory,
- we don't need to limit generator's throughput.
Consider what would happen if we used unbounded Channel
in Generator
class. Since the generator class can generate a new number wihout any waiting, the default C# TaskScheduler
has no reason to run the Generate function asynchronously, therefore the Channel
would be filled by integers until the memory is full and a memory exception is thrown. As you can see using unbounded channels can be potentionally dangerous if we are not careful enough. For that reason I recommend use bounded channels by default and switch to the unbounded ones only when necessary. Since the in the unbounded variant there is no waiting on write, there is no need for TaskScheduler
to intervene, which could result in faster computation.
Why C# even needs channels?
You could argue that we do not need channels at all. In the example we could just use TPL in combination with async/await and build an asynchronous prime sieve that would be doing the exact same thing. Of course you avoid channels and use other tools C# provides to build parallel programs. But from my experience it seems that in some cases it is easier to reason, test and scale concurrent programs.
In the current project I am working on, we have noticed that using channels simplifies parallelism, because we only need to create and test few simple classes and then connect them via channels. With almost no effort we end with correct parallel programs. We do not have to be afraid of deadlock, race conditions, we do not have to think how to make our clasess thread safe. Other big advantage is if any class in the concurrent program seems to be a bottleneck. For example if we notice our ChannelReader
is most of the time full, we can always spawn another instance of it and connect it to the ChannelReader
and with almost no effort we have made our application faster.
If the channel data structure interests you, I recommend to read through Microsoft documentation and look at Rob Pike's presentations Concurrency is not parallelism and Go Concurrency patterns. Eventhought the presentations are aimed for Golang the same principles apply to C#.
Top comments (0)