DEV Community

Vinicius Carvalho
Vinicius Carvalho

Posted on

Pipes and Filters in Kotlin

Disclaimer: coroutines are still a new topic to me, many of the implementation ideas presented in this post could not be the best usage of it. You have been warned :)

Pipes and Filters

Pipes and Filters is a very common Integration Pattern. Entire frameworks such as the awesome Spring Integration have been built around it.

It's a simple concept, pipes connects endpoints much like you would use in a unix command ps | grep, this model allows endpoints to be independent of each other.

If you look into how Spring Integration achieves this, is by using a Channel abstraction

Well, it does so happens that kotlin coroutines also have a Channel implementation used to allow communication across coroutines.

So I was wondering if we could use it to create a very simple pipes and filters flow without leveraging any other external dependency.

The idea

  • Write something that allows leveraging channels as pipes to connect coroutine functions.

  • Leverage coroutine suspend features for concurrent execution, sometimes parallel withContext too :)

  • Write a simple DSL(ish) like source via fn via fn2 into fn4

It's not on the scope of this blog post to attempt to replicate all the features of a complete integration framework. No support for fanout via a publisher channel for example. No error handling or ack support.

How

So before diving into some code, I must confess that so far I have been always using GlobalScope.launch as the only way to launch my coroutines. And then I read the following blog posts below. If anything useful could come out of my blog today is those links for you:

We are starting with an assumption that the source will always be a Channel<T>, this is just to simplify things.

We have two pipes [via || into] and two top level endpoints: ChannelSink<T>, ChannelProcessor<T, R>

Let's look into our Sink first

class ChannelSink<T>(private val channel: Channel<T>, block:  (T) -> Unit) : CoroutineScope {
    private val job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    init {
        launch {
            for(message in channel){
                block(message)
            }
        }
    }
}

Not too much going on here, our Sink reads messages from a channel (in a non blocking way using the launch builder). And then invokes the block function that would process the sink (invoke a rest endpoint, call a database, etc)

Our processor is a map style of function (T) -> (R)

class ChannelProcessor<T, R>(private val inputChannel: Channel<T>, block: (T) -> (R)) : CoroutineScope {
    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    val outputChannel = Channel<R>()
    init {
        launch {
            for(message in inputChannel){
                outputChannel.send(block(message))
            }
        }
    }
}

Very similar to our Sink, but now we create an outputChannel that we write transformed messages using block(message)

And that's all the types we need. Now we need some plumbing code around our pipes [No pun intended]

Plumbing code (literally)

Ok so our "DSL" should be connecting the functions using channels as Pipes. So at the very beginning we said we want our sources to be Channels. So let's write two possible scenarios:

Source -> Processor
Source -> Sink

//Source -> Sink
infix fun <T> Channel<T>.into(block:  (T) -> Unit) : ChannelSink<T> {
    return ChannelSink(this, block)
}
//Source -> Processor
infix fun <T, R> Channel<T>.via(block: (T) -> R) : ChannelProcessor<T, R> {
    return ChannelProcessor(this, block)
}

our via/into functions pass the function receiver here (Channel) to the constructor of the ChannelSink/ChannelProcessor.

To daisy chain our processors now we need two extra functions:

infix fun <T, R> ChannelProcessor<T, R>.into(block: (R) -> Unit) : ChannelSink<R> {
    return ChannelSink(this.outputChannel, block)
}
infix fun <T, R, S> ChannelProcessor<T, R>.via( block: (R) -> S) : ChannelProcessor<R, S> {
    return ChannelProcessor(this.outputChannel, block)
}

Note that we daisy chain the channels by getting the outputChannel of a Processor and passing it as the inputChannel of the next Processor or the Sink.

Running it

First let's define some functions

//sink function
val fn: (Double) -> Unit = { i ->
   log("Received $i")
}
//first processor
val tn: (Int) -> (Int) = { i ->
       i * 2
}
//second processor
val tn2: (Int) -> Double = { i -> i * 3.0 } 

Now let's get our producer channel ready

val source = Channel<Int>()
launch {
    with(source) {
        for (i in 0..size) {
            send(i)
        }
        close()
    }
}

And finally connect the pipes:

source via tn via tn2 into fn

And that's pretty much it, we can now use our channels with a pipes and filters syntax.

WAIT A MINUTE

Kotlin has support for Collection Streaming, why not just create a collection or a Sequence and use map and collect functions?

Looking at this code, you are absolutely right to spot that, and I'm truly sorry if I mad you read all that so far, but I wanted to make a point :)

Say we change our Processor and Sink code a little bit:

class ChannelProcessor<T, R>(private val inputChannel: Channel<T>, block: (T) -> (R)) : CoroutineScope {
    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    val outputChannel = Channel<R>()
    init {
        launch {
            for(message in inputChannel){
                launch {  outputChannel.send(block(message)) }
            }
        }
    }
}

class ChannelSink<T>(private val channel: Channel<T>, block:  (T) -> Unit) : CoroutineScope {
    private val job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    init {
        launch {
            for(message in channel){
                launch {
                    block(message)
                }
            }
        }
    }
}

Note that now, we launch a child coroutine for every function call, what this enables is that we could run all the steps in parallel.

Kotlin currently does not offer a standard way to run streams in parallel (unless you rely on java version). With this last change, the code runs concurrently. note this means sequencing of messages is affected.

To test this just try the updated code with this new scenario:

@ExperimentalCoroutinesApi
    @Test
    fun testAsyncProduce() {
        val size = 100
        val latch = CountDownLatch(size)
        runBlocking<Unit> {

            val source = Channel<Int>()
            launch {
                with(source) {
                    for (i in 0..size) {
                        send(i)
                    }
                    close()
                }
            }
            val fn: (Double) -> Unit = { i ->
                log("Received $i")
                latch.countDown()
            }
            val tn: (Int) -> (Int) = { i ->
                runBlocking {
                    if (i % 2 == 0) {
                        delay(10)
                    }
                    i * 2
                }
            }
            val tn2: (Int) -> Double = { i -> i * 3.0 }

            source via tn via tn2 into fn
        }
        latch.await()
    }

In this test, one of our functions will cause a delay on even numbers, but that will not cause a sequencial pause on the execution.

Those were just some thoughts I had while writing a code this week that had to deal with a simple pipeline and a sink to invoke a REST service. Put some of this in practice, but it's not to be taken as too serious.

Happy coding

Top comments (0)