DEV Community

Aniket Bhoite
Aniket Bhoite

Posted on

Kotlin Flow: Simple yet Powerful Implementation

Ever since Flow was first introduced in Kotlin version 1.3.0, it has become an important and highly discussed topic in the Kotlin community. But why?

Flow is a simple yet powerful implementation for asynchronous programming in Kotlin.

Flow is nothing but a coroutine that returns multiple results. It allows developers to write structurally concurrent, asynchronous and non-blocking code in a sequential style. But what do all these technical terms means to new developers, it's hard to grasp all of them at once. It took me some time to understand it.

The reason Flow can do this without being too expensive is because it has a very simple internal implementation. Flow is nothing but two interfaces and their functions calling each other. Let me explain, but first, forget for the moment anything you know about coroutines suspend function and all.

Let's start.

To understand how flow internally works, we only need to understand two main concepts.

  • Interface and Anonymous Implementation of an Interface

  • Functional interface, or a Single Abstract Method (SAM) interface in Kotlin

Let’s create two Flow interfaces.

public interface Flow {
    public fun collect(collector: FlowCollector)
}

public interface FlowCollector {
    public fun emit(value: String)
}



val outputStream = object : Flow {
    override fun collect(collector: FlowCollector) {

        collector.emit("first")
        collector.emit("second")

    }
}

outputStream.collect(
    object : FlowCollector {
        override fun emit(value: String) {
            println(value)

            println(value.uppercase())
        }

    }
)


/*

OUTPUT:

first
FIRST
second
SECOND

 */

Enter fullscreen mode Exit fullscreen mode

P.S: For simplicity, we are currently only considering the Flow type as a string.

That's it!! This is flow in its simplest and bare-bones form, and we have only used the first concept so far. As you can see, this is nothing but one function directly calling another one multiple times.

Let's make this a little bit prettier by using a Functional Interface

An interface with only one abstract method is called a Functional Interface or a Single Abstract Method (SAM) interface. The functional interface can have multiple non-abstract members but only one abstract member.

public interface Flow {
    public fun collect(collector: FlowCollector)
}

public fun interface FlowCollector {
    public fun emit(value: String)
}


val outputStream = object : Flow {
    override fun collect(collector: FlowCollector) {

        collector.emit("first")
        collector.emit("second")

    }
}

outputStream.collect { value ->
    println(value)

    println(value.uppercase())
}

// SAME OUTPUT
Enter fullscreen mode Exit fullscreen mode

It's starting to look similar now. The anonymous implementation of the Flow interface is boilerplate code. This can be solved by using a top-level function with the Flow anonymous implementation as a return type.

fun flow(block: FlowCollector.() -> Unit): Flow = object : Flow {
    override fun collect(collector: FlowCollector) {
        collector.block()
    }
}

public interface Flow {
    public fun collect(collector: FlowCollector)
}

public fun interface FlowCollector {
    public fun emit(value: String)
}


val outputStream = flow {

    emit("first")
    emit("second")

}


outputStream.collect { value ->
    println(value)

    println(value.uppercase())
}

//SAME OUTPUT
Enter fullscreen mode Exit fullscreen mode

This is the familiar-looking flow we use under the hood. This doesn’t appear to be as feature-rich as we mentioned above. In theory, any programming language can do this so what makes Kotlin so unique? The answer is Kotlin’s magic function keyword "suspend”.

public interface Flow {
    public suspend fun collect(collector: FlowCollector)
}

public fun interface FlowCollector {
    public suspend fun emit(value: String)
}
Enter fullscreen mode Exit fullscreen mode

Making functions suspendable means they can do much more than simple functions. Making these functions suspend ensures that flow can only be called from other suspend functions or coroutine builders. Try to remember what coroutines can do. Flow incorporates all the features of coroutine. For example:

  1. Structured Concurrency: The Flow Builder block (collect function) runs in the same coroutine scope as the flow collector block by default. So when the scope is stopped both the builder and collector blocks stops executing, saving precious CPU resources. No need to manually close the stream.

  2. Asynchronous, non-blocking code: We can control which code decide which code should run which Dispatcher. This makes it very useful for running IO operations on Background threads without blocking the original/main thread.

  3. Sequential & Readable: Suspend functions wait for the result of other suspend functions in the order until then they remain in a suspending state without blocking the thread.

  4. Back Pressure: The Flow Builder and Collector block runs on the same coroutine so when collector block is processing result, the Builder block is suspended to prevent it from producing unnecessary results that may not be processed by the Builder block. This special feature comes with flow by default. Other libraries have to do fancy workarounds to achieve this.

And many more features like this.

One of the flagship features of Flow is its operator functions. Are they complex to understand? No, because flow is nothing more than simple function calling each other. So, if we create an extension function with a flow as a return type, we can theoretically accomplish anything. Let’s look at the onStart and onCompletion operator functions.

fun Flow.onStart(
    action: FlowCollector.() -> Unit
) = flow {
    action()
    collect(this)
}

fun Flow.onCompletion(
    action: FlowCollector.() -> Unit
) = flow {
    collect(this)
    action()
}


......

outputStream
   .onStart {
        println("onStart")
    }
    .onCompletion {
        println("onComplete")
    }
    .collect { value ->
        println(value)

        println(value.uppercase())
    }


/*

OUTPUT:

onStart
first
FIRST
second
SECOND
onComplete

 */
Enter fullscreen mode Exit fullscreen mode

PS: This is a highly simplified explanation. There are many complex processes happening behind the scenes, but the main logic remains unchanged.

So, as you can see, we have the ability to create any type of flow operator we can imagine using extension functions. Feel free to go crazy!

To satisfy someone's curiosity, this is a Flow interface with a Generic type:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
Enter fullscreen mode Exit fullscreen mode

I hope you understand why Flow is a simple yet powerful API implementation.

Resources:

Top comments (1)

Collapse
 
y9vad9 profile image
Vadym Yaroshchuk • Edited

It would be great to mention that main idea of Flow is that they're cold: execution suspends until consumer is ready to consume (basically, it realized using kotlin coroutines nature). A bit similar idea has for example Sequence<T>, but it, of course, only about collections.

So, it's a bit wrong to tell about flow without coroutines as a context. So, basically, coroutines simplifies our job by making next using suspend keyword:

interface FlowCollector<T> {
    fun emit(value: T, next: (() -> T)? = null)
}

fun <T> flow(block: FlowCollector<T>.() -> Unit) {...}

fun main() {
    val flow = flow {
         emit(1) {
             emit(2) {...}
         }
    }
}
Enter fullscreen mode Exit fullscreen mode

This weird look is solved using kotlin coroutines: Continuation<in T>, startCoroutine, other coroutines' primitives and kotlin compiler.

I recommend my article with explanation of Sequences to understand more coroutines logic.