DEV Community

Bugfender
Bugfender

Posted on • Originally published at bugfender.com on

Kotlin Flow Tutorial: Build Reactive and Scalable Applications

Efficient handling of asynchronous data streams is an important tool of modern application development. Kotlin Flows, part of the Kotlin Coroutines library, provide a flexible and elegant solution for working with such data streams. Kotlin Flows are part of Kotlin Coroutines – unlike traditional callbacks or RxJava handling, which can be clunkier and may not directly integrate with your existing code structure easily.

Kotlin Flow is necessary for dealing with processes where data changes over time, such as UI events and network responses from the database, or updates in real-time. By using Coroutines and Functional programming, developers can build reactive applications quickly and with great performance quality.

Here we will deep-dive into Kotlin Flows, with a special focus on the advanced stuff beyond basic usage. We will cover:

Flow Builders – Custom flow builders for complex cases – Flow Builder, Processors

Functional operators : How to use complex operators for manipulating data

State Management : Using StateFlow & SharedFlow For UI States and Events(MVVM)

Error Handling : How to build resilient flows with more sophisticated error-handling techniques

Concurrency & Parallel Processing : Efficient processing for multiple data streams at the same time.

Combining, Merging Flows – Combining multiple flows into cohesive data pipelines.

Test Flows : How to test flows efficiently in unit tests

Performance optimization : Techniques to profile and make flow performance optimal.

After completing this tutorial, you will have gained an in-depth knowledge of how to use advanced Kotlin flow concepts, and an understanding of why they should be used while developing reactive, scalable and maintainable app.

Deep Dive into Flow Builders

Kotlin Flow comes with several builders to work on different types of data streams, like Realtime sensor data processing.

A Flow Builder in Kotlin is a construct that allows you to create a stream of data dynamically, defining how the data should be emitted and handled. Builders such as flow, flowOf, and asFlow help developers generate flows for various use cases, from emitting simple static values to streaming real-time dynamic data. These builders are foundational tools in Kotlin Coroutines, enabling reactive programming patterns in your applications.

Revisiting Basic Flow Builders

flow : The flow builder is a basic way to create a flow. It allows multiple values to be emitted serially.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

fun simpleFlowEx(): Flow<Int> = flow {
    for (i in 1..3) {
        emit(i)
    }
}

Enter fullscreen mode Exit fullscreen mode

flowOf : The flowOf builder helps with creating a flow from a pre-defined set of data.

import kotlinx.coroutines.flow.flowOf

val flowOfNumbers = flowOf(1, 2, 3, 4, 5)

Enter fullscreen mode Exit fullscreen mode

asFlow : The asFlow helps with converting various collections, sequences, or other iterable components into flows.

import kotlinx.coroutines.flow.asFlow

val lists = listOf(1, 2, 3)
val flowFromLists = lists.asFlow()

Enter fullscreen mode Exit fullscreen mode

These basic builder tools are advanced and powerful when it comes to creating simple flows. For real-world applications we mostly deal with more complex situations.

Creating Custom Flow Builders for Complex Scenarios

These builders can be used for encapsulating your custom complex logic to build flows, making the flow reusable and testable. In this blog post, we will see how to write a Flow using Streams – for example, in cases where you have to stream real-time sensor data.

Imagine you have to write a flow that reads data in real-time from a sensor. By wrapping sensor data access into a custom Flow builder, it will be efficient to handle the data received from these sensors on a regular interval.

Creating custom Flow builder for real-time sensor data:

Define Data Model

Before creating custom flows, it’s essential to define the structure of the data that will be emitted. A data model ensures consistency and clarity, especially in scenarios where the emitted data has multiple attributes (e.g., a timestamp and a value). For instance, in our example of real-time sensor data, defining a structured data model like SensorDataM ensures that every emitted value follows a predictable format, making it easier to process downstream.

data class SensorDataM(val timestamp: Long, val value: Double)
Enter fullscreen mode Exit fullscreen mode

Create a Flow Builder Function

This will simulate the real-time data emitting from a sensor. It will emit SensorDataM at a regular interval.

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlin.random.Random

fun sensorDataFlow(intervalMillis: Long): Flow<SensorData> = flow {
    while (true) {
        val sensorData = SensorData(
            timestamp = System.currentTimeMillis(),
            value = Random.nextDouble(0.0, 100.0)
        )
        emit(sensorData)
        delay(intervalMillis)
    }
}
Enter fullscreen mode Exit fullscreen mode

Collecting the Flow

Let’s now collect the flow to process real-time sensor data.

import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val sensorFlow = sensorDataFlow(1000L) // Emit data every second
    sensorFlow.collect { data ->
        println("Received sensor data: $data")
    }
}

Enter fullscreen mode Exit fullscreen mode

Here sensorDataFlow is a function that creates a flow that emits SensorDataM objects every second. This flow can then be real-time processed, such as with monitoring systems, IoT devices or any scenario of continuously-generated (even by outside factors) data that should respond asynchronously.

Advanced Operators and Transformations

Kotlin Flows provide ways to combine, transform and manage data streams. In the next section, we will delve more deeply into these operators, and understand what to do when dealing with slow Collectors by using Conflate. We will also walk through an example of a data pipeline where multiple transformations are done.

Intermediate Operator Exploration

map : This operator changes every value emitted by the flow.

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking
// Main function to use .map example
fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.map { it * 2 }
        .collect { println(it) } // Outputs: 2, 4, 6
}

Enter fullscreen mode Exit fullscreen mode

filter : Returns flow that emits only those values for which condition is true.

import kotlinx.coroutines.flow.filter
//Filter example based on condition
fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.filter { it % 2 == 0 }
        .collect { println(it) } // Outputs: 2
}

Enter fullscreen mode Exit fullscreen mode

transform : With the help of this operator you can emit multiple values, and do side effects.

import kotlinx.coroutines.flow.transform
// emit the multiple values
fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.transform { value ->
        emit(value * 2)
        emit(value * 3)
    }.collect { println(it) } 
    // Outputs: 2, 3, 4, 6, 6, 9
}

Enter fullscreen mode Exit fullscreen mode

flatMapConcat : Maps each value to a new flow and emits them one-by-one.

import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flow

fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.flatMapConcat { value ->
        flow { emit(value * 2) }
    }.collect { println(it) } 
    // Outputs: 2, 4, 6
}

Enter fullscreen mode Exit fullscreen mode

flatMapMerge : Maps each value to a new flow and merges the emissions into results concurrently.

import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow

fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.flatMapMerge { value ->
        flow { emit(value * 2) }
    }.collect { println(it) } // Outputs: 2, 4, 6 (order may vary)
}

Enter fullscreen mode Exit fullscreen mode

flatMapLatest : For each value, it will map the single item emitted to a Flow and “listen” only for emissions coming from the last mapped flow.

import kotlinx.coroutines.flow.flatMapLatest

fun main() = runBlocking {
    val flow = flowOf(1, 2, 3)
    flow.flatMapLatest { value ->
        flow { emit(value * 2) }
    }.collect { println(it) } // Outputs: 6 (only the latest emission)
}

Enter fullscreen mode Exit fullscreen mode

Managing State with StateFlow and SharedFlow

To manage state and events in your applications, the Kotlin Flows comes with specialised tooling which are StateFlow and SharedFlow. If you know when to use them and their differences, it leads to the creation of more robust apps that run faster.

StateFlow & SharedFlow Difference

StateFlow

i) StateFlow can be considered as a state-holder, to which one or many observers may listen for its changes. It always has the current state and its updates to obsolete state changes.

ii) It is ideal for representing UI state in Android apps as it always keeps the latest value, and updates all observers.

iii) You must consider it as a live data holder, which you can easily manage and visualize.

SharedFlow

i) SharedFlow is more like an event broadcaster. It doesn’t maintain a state but it has the ability to post values to multiple listeners.

ii) It makes events or actions you need shared across different parts of your application (like a notification system) really easy!

iii) Think of it as a radio broadcast that can be tuned into by many devices receiving the same event.

StateFlow to Manage UI State in Android

For responsive Android apps, it is essential to manage UI state efficiently. StateFlow helps you observe it easily with the type safe states.

Step 1: Add Dependencies: You need to include its dependencies in your build.gradle file:

//Import dependencies in Gradle file
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2"

Enter fullscreen mode Exit fullscreen mode

Step 2: Define the ViewModel: We will create a ViewModel which holds and exposes the UI state via StateFlow.

import androidx.lifecycle.ViewModel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update

data class UiState(val message: String = "")
// View Model class.
class ViewModel : ViewModel() {
    private val _uiState = MutableStateFlow(UiState())
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()
//Update the message
    fun updateMessageFunction(newMessage: String) {
        _uiState.update { currentState ->
            currentState.copy(message = newMessage)
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Step 3: Observe the StateFlow in the UI: Then, take that StateFlow in your UI to respond on state changes.

import android.os.Bundle
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.lifecycle.viewmodel.compose.viewModel
// Define Main activity class. 
class MainActivity : ComponentActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContent {
            val viewModel: MyViewModel = viewModel()
            val uiState by viewModel.uiState.collectAsState()

            AppScreen(uiState = uiState, onButtonClick = {
                viewModel.updateMessage("Hello, Bugfender!")
            })
        }
    }
}

@Composable
fun AppScreen(uiState: UiState, onButtonClick: () -> Unit) {
    // Your UI components here
    Text(text = uiState.message)
    // Add a button to trigger onButtonClick
}

Enter fullscreen mode Exit fullscreen mode

Error Handling & Flow Resilience

Handling errors gracefully and ensuring your data streams are resilient is crucial in any robust application. Kotlin Flows provide several advanced techniques for error handling and recovery, such as catch, retry, and retryWhen. Additionally, you can implement fallback mechanisms and alternative flows to maintain reliability, even in the face of errors.

Let’s explore these techniques and see how they can be applied, including a practical example of retrying network requests with exponential backoff.

It is a typical way to handle errors gracefully and make our data streams resilient in any robust application. The Kotlin Flows offer quite a few powerful mechanisms to distinguish between errors and provide recovery options in the form of catch, retry,and retryWhen. You can also define fallback and alternative flows to provide the desired reliability, even in situations where errors occur. Well, let us delve into these strategies and how they can be used with an example of a re-try network request using exponential backoff.

Seven Advanced error handling techniques

catch : The catch operator gives you the tools to deal with exceptions within your flow. This is where you can either emit different values or just re-throw the exception.

import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val flow = flow {
        emit(1)
        throw RuntimeException("Error occurred")
        emit(2)
    }.catch { e ->
        println("Caught exception: $e")
        emit(-1)
    }

    flow.collect { value ->
        println(value) // Outputs: 1, -1
    }
}

Enter fullscreen mode Exit fullscreen mode

retry : The retry also tries to resubscribe the preorder after an Exception in amount of times.

import kotlinx.coroutines.flow.retry

fun main() = runBlocking {
    val flow = flow {
        emit(1)
        throw RuntimeException("Error occurred")
    }.retry(3) {
        println("Retrying due to: $it")
        true // Return true to retry, false to stop retrying
    }

    flow.catch { e ->
        println("Caught exception after retries: $e")
    }.collect { value ->
        println(value) // Outputs: 1, 1, 1, 1
    }
}

Enter fullscreen mode Exit fullscreen mode

retryWhen : It also provides more control to you by ‘when’ predicate as condition, or delay in retry.

import kotlinx.coroutines.flow.retryWhen

fun main() = runBlocking {
    val flow = flow {
        emit(1)
        throw RuntimeException("Error occurred")
    }.retryWhen { cause, attempt ->
        println("Retrying due to: $cause, attempt: $attempt")
        attempt < 3 // Retry up to 3 times
    }

    flow.catch { e ->
        println("Caught exception after retries: $e")
    }.collect { value ->
        println(value) // Outputs: 1, 1, 1, 1
    }
}

Enter fullscreen mode Exit fullscreen mode

Testing Flows

It is important to test flows, as you want your asynchronous data streams to be reliable and correct. It has some tools like TestCoroutineScope and the Turbine library to make testing flows easier. In this part, we will show how to test flows, as well introduce state management testing strategies using TestCoroutineScope and Turbine, with an example of a Unit Testing for flow-based data repository.

Unit Test Strategies for Flow testing

Testing Flow Emissions: Check if a flow successfully emits the provided sequence of values. Capture Emitted Values Using Test Collectors

Testing Flow Transformations: Ensure stream transformations (map, filter etc.) give expected results. Compose transformations in a chain and test for the final output.

Testing Error Handling: Try and simulate errors, validate that the flow took care of them correctly. Testing retry mechanisms and fallback logic.

Testing with Delays: Manage flows with delays and time-based emissions. You can leverage this flow API to manage and validate emissions over simulated time using virtual times.

Flow Testing with TestCoroutineScope and Turbine

TestCoroutineScope : The same set of operations would be possible for any method using TestCoroutineScope, a specialized coroutine scope suitable only for testing, as it allows you to control how coroutines are executing and manipulate the time in a virtual way.

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestCoroutineDispatcher
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest

@ExperimentalCoroutinesApi
val testDispatcher = TestCoroutineDispatcher()
@ExperimentalCoroutinesApi
val testScope = TestCoroutineScope(testDispatcher)

Enter fullscreen mode Exit fullscreen mode

Turbine Library : Flows Testing is a small library for testing flows. Test Extension function to collect and assert flow emissions. Add dependency to build.gradle:

testImplementation "app.cash.turbine:turbine:0.5.0"

Enter fullscreen mode Exit fullscreen mode

Performance Optimization

It is essential to efficiently optimize the performance of Kotlin Flows to develop responsive and fast applications. Optimization should involve profiling the flows, eliminating common pitfalls and anti-patterns as well as best practices to improve performance. Let us now discuss each of these aspects, culminating in the optimization of a flow-based, data-processing pipeline by using an example.

Profiling & Optimizing Flow Performance

  1. Profiling the Flows : We need to use the official tools and third party libraries offered by Kotlin to profile the flows. We can track the execution times, as well as memory and CPU consumption. With Profiling tools like Android Studio Profiler, Kotlinx.coroutines Debugging, and tracing methods may be used for this purpose.
  2. Optimizing Flow Operators : Operators like map, collect, and filter are used sparingly. Expensive operations should not be performed within flow operators. Use a buffer while dealing with slow collectors to de-link them from the flow stream to prevent slowness for the remaining operators.
  3. Concurrency in Flows : Use the flowOn to change the flow execution context. For more parallel executions of independent operations of the flow: use flatMapMerge or flatMapConcat
  4. Handling Backpressure : For handling backpressure for an emitting flow that produces data faster than the collecting process consumes, there are buffer, conflate, and debounce. Buffer adds a buffer used as a leaky bucket. conflate will only keep the most recent value; the others will be lost. debounce adds a delay between the emissions.

Conclusion

In this article, we took a deep-dive into the advanced functionalities of Kotlin Flows dealing with asynchronous data streams effectively and efficiently.

A quick overview of key points we’ve gone over:

We went over our basic flow builders like flow, flowOf and asFlow. Specific use case – one of them was real-time sensor data that we have in-built user flow for.

We looked at operators like map, filter, transform, flatMapConcat,flatMapMerge and flatMapLatest. StateFlow vs SharedFlow In Android applications we did it with StateFlow for UI state management. We have talked about fancy error handling such as catch,retry and retryWhen.

We also go over the testing of Flows. We leveraged a TestCoroutineScope and the Turbine library to test flows.

In summary, Kotlin Flows provide a flexible and powerful structure to deal with streams of asynchronous data. Learning the advanced features highlighted in this article, you can make your applications more reactive, robust and maintainable. Begin with the simplest of things; profile your flows and incrementally build in more complex transformations. Keep practicing with Kotlin Flows and you will get much better at using them to the max. Happy coding!

Top comments (0)