Disclaimer: coroutines are still a new topic to me, many of the implementation ideas presented in this post could not be the best usage of it. You have been warned :)
Pipes and Filters
Pipes and Filters is a very common Integration Pattern. Entire frameworks such as the awesome Spring Integration have been built around it.
It's a simple concept, pipes connects endpoints much like you would use in a unix command ps | grep
, this model allows endpoints to be independent of each other.
If you look into how Spring Integration achieves this, is by using a Channel abstraction
Well, it does so happens that kotlin coroutines also have a Channel implementation used to allow communication across coroutines.
So I was wondering if we could use it to create a very simple pipes and filters flow without leveraging any other external dependency.
The idea
Write something that allows leveraging channels as pipes to connect coroutine functions.
Leverage coroutine suspend features for concurrent execution, sometimes parallel withContext too :)
Write a simple DSL(ish) like
source via fn via fn2 into fn4
It's not on the scope of this blog post to attempt to replicate all the features of a complete integration framework. No support for fanout via a publisher channel for example. No error handling or ack support.
How
So before diving into some code, I must confess that so far I have been always using GlobalScope.launch
as the only way to launch my coroutines. And then I read the following blog posts below. If anything useful could come out of my blog today is those links for you:
We are starting with an assumption that the source will always be a Channel<T>
, this is just to simplify things.
We have two pipes [via || into] and two top level endpoints: ChannelSink<T>, ChannelProcessor<T, R>
Let's look into our Sink first
class ChannelSink<T>(private val channel: Channel<T>, block: (T) -> Unit) : CoroutineScope {
private val job: Job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
init {
launch {
for(message in channel){
block(message)
}
}
}
}
Not too much going on here, our Sink reads messages from a channel (in a non blocking way using the launch
builder). And then invokes the block
function that would process the sink (invoke a rest endpoint, call a database, etc)
Our processor is a map style of function (T) -> (R)
class ChannelProcessor<T, R>(private val inputChannel: Channel<T>, block: (T) -> (R)) : CoroutineScope {
private val job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
val outputChannel = Channel<R>()
init {
launch {
for(message in inputChannel){
outputChannel.send(block(message))
}
}
}
}
Very similar to our Sink, but now we create an outputChannel
that we write transformed messages using block(message)
And that's all the types we need. Now we need some plumbing code around our pipes [No pun intended]
Plumbing code (literally)
Ok so our "DSL" should be connecting the functions using channels as Pipes. So at the very beginning we said we want our sources
to be Channels
. So let's write two possible scenarios:
Source -> Processor
Source -> Sink
//Source -> Sink
infix fun <T> Channel<T>.into(block: (T) -> Unit) : ChannelSink<T> {
return ChannelSink(this, block)
}
//Source -> Processor
infix fun <T, R> Channel<T>.via(block: (T) -> R) : ChannelProcessor<T, R> {
return ChannelProcessor(this, block)
}
our via/into
functions pass the function receiver here (Channel) to the constructor of the ChannelSink/ChannelProcessor.
To daisy chain our processors now we need two extra functions:
infix fun <T, R> ChannelProcessor<T, R>.into(block: (R) -> Unit) : ChannelSink<R> {
return ChannelSink(this.outputChannel, block)
}
infix fun <T, R, S> ChannelProcessor<T, R>.via( block: (R) -> S) : ChannelProcessor<R, S> {
return ChannelProcessor(this.outputChannel, block)
}
Note that we daisy chain the channels by getting the outputChannel of a Processor and passing it as the inputChannel of the next Processor or the Sink.
Running it
First let's define some functions
//sink function
val fn: (Double) -> Unit = { i ->
log("Received $i")
}
//first processor
val tn: (Int) -> (Int) = { i ->
i * 2
}
//second processor
val tn2: (Int) -> Double = { i -> i * 3.0 }
Now let's get our producer channel ready
val source = Channel<Int>()
launch {
with(source) {
for (i in 0..size) {
send(i)
}
close()
}
}
And finally connect the pipes:
source via tn via tn2 into fn
And that's pretty much it, we can now use our channels with a pipes and filters syntax.
WAIT A MINUTE
Kotlin has support for Collection Streaming, why not just create a collection or a Sequence and use map
and collect
functions?
Looking at this code, you are absolutely right to spot that, and I'm truly sorry if I mad you read all that so far, but I wanted to make a point :)
Say we change our Processor and Sink code a little bit:
class ChannelProcessor<T, R>(private val inputChannel: Channel<T>, block: (T) -> (R)) : CoroutineScope {
private val job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
val outputChannel = Channel<R>()
init {
launch {
for(message in inputChannel){
launch { outputChannel.send(block(message)) }
}
}
}
}
class ChannelSink<T>(private val channel: Channel<T>, block: (T) -> Unit) : CoroutineScope {
private val job: Job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
init {
launch {
for(message in channel){
launch {
block(message)
}
}
}
}
}
Note that now, we launch
a child coroutine for every function call, what this enables is that we could run all the steps in parallel.
Kotlin currently does not offer a standard way to run streams in parallel (unless you rely on java version). With this last change, the code runs concurrently. note this means sequencing of messages is affected.
To test this just try the updated code with this new scenario:
@ExperimentalCoroutinesApi
@Test
fun testAsyncProduce() {
val size = 100
val latch = CountDownLatch(size)
runBlocking<Unit> {
val source = Channel<Int>()
launch {
with(source) {
for (i in 0..size) {
send(i)
}
close()
}
}
val fn: (Double) -> Unit = { i ->
log("Received $i")
latch.countDown()
}
val tn: (Int) -> (Int) = { i ->
runBlocking {
if (i % 2 == 0) {
delay(10)
}
i * 2
}
}
val tn2: (Int) -> Double = { i -> i * 3.0 }
source via tn via tn2 into fn
}
latch.await()
}
In this test, one of our functions will cause a delay on even numbers, but that will not cause a sequencial pause on the execution.
Those were just some thoughts I had while writing a code this week that had to deal with a simple pipeline and a sink to invoke a REST service. Put some of this in practice, but it's not to be taken as too serious.
Happy coding
Top comments (0)