DEV Community

Kevin Galligan for Touchlab

Posted on • Updated on

Practical Kotlin Native Concurrency Part 3

Parts 1 and 2 set the conceptual stage for what we're going to do in Part 3: look at some actual concurrency libraries. We have one more conceptual tool to cover, then we'll go over 2 options for using coroutines in a production app, and discuss Reaktive: a reactive streams implementation.

Atomics

KN provides a set of Atomic classes that allow you to have mutable state within a frozen object. In general, it's a good idea to minimize Atomic usage in lieu of "better" architecture, but they can be very useful in cases. Also, when first exposed to KN's state architecture, it can take some time to rethink how architecture should be laid out, and Atomics can help bridge that gap.

We will be using KN's Atomic primitives directly, but in practice, you'll likely use a library. We publish Stately, but also Atomic-fu has recently added native support. On native they're similar, but on the jvm Atomic-fu has some optimizations and restrictions. You should make an informed decision on which to choose:

GitHub logo touchlab / Stately

Kotlin Multiplatform State Library


GitHub logo Kotlin / kotlinx.atomicfu

The idiomatic way to use atomic operations in Kotlin

Let's take a look at Atomics in action. We'll be using KN's atomics directly, but either of the two libraries above have equivalent definitions and use KN's atomics under the hood.

First, let's revert back to the master branch.

git checkout .
git checkout master

Atomic Primitives

KN provides a set of Atomic classes for integral values: AtomicInt and AtomicLong. Those are conceptually simple to use. Run atomicIntDemo()

fun atomicIntDemo(){
    val dc = AtomicDataCounter()
    dc.freeze()
    dc.ai.increment()
    println("dc $dc")
}

class AtomicDataCounter {
    val ai = AtomicInt(3)
}

AtomicDataCounter has an AtomicInt property. We freeze our instance of AtomicDataCounter, but can still change the AtomicInt value. You can do this from any thread. Run atomicThreadsDemo()

fun atomicThreadsDemo() {
    val dc = AtomicDataCounter()
    background {
        dc.ai.increment()
        println("dc ${dc.ai.value}, is frozen ${dc.isFrozen}")
    }
}

AtomicReference is more generally applicable, as it allows you to have an object reference that can be changed in a frozen object. The object held in the AtomicReference must itself be frozen.

fun atomicRefDemo() {
    val dr = AtomicDataReference().freeze()
    dr.ar.value = SomeData("Hello ๐ŸŽธ", 43).freeze()
    println("dr ${dr.ar.value}")
}

class AtomicDataReference {
    val ar = AtomicReference(SomeData("Hello ๐Ÿš€", 32).freeze())
}

If you have global objects that need to have mutable state, this is a straightforward way to manage that state. Global objects visible to multiple threads need to be frozen, so atomics are necessary if you need to change anything. However, there are some things to keep in mind.

1) Atomic ref performance is obviously not going to be the same as regular mutable state. Freezing isn't free either. Heavy atomic use may not be a great idea if performance is a consideration.

2) AtomicReference currently should be cleared out if it's not a "forever" object. It is possible to leak memory otherwise. Not always, though. This is something we have on the list for a future blog post. However, the situation may be changing anyway, so this may not be an issue in the near future.

Coroutines

The current release version of kotlinx.coroutines operates in a single thread only. Jetbrains has done such a good job associating "concurrency" with "coroutines" that some people in the community confuse the situation to think KN doesn't support concurrency at all.

Fortunately, that situation is changing. There is a preview branch of kotlinx.coroutines that is functional, and we'll be walking through that. Because it's a preview, including it in a production app may be difficult because of library dependencies, not to mention potential bugs. In the interim there is CoroutineWorker from Autodesk, which provides a source-compatible (ish) implementation for common cases.

kotlinx.coroutines Multithreaded Preview

GitHub logo Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines

Since this branch has not been officially released, you'll need to build kotlinx.coroutines locally.

Good News! There's another prebuilt preview release! You don't need to build locally, at least for now. Just update your sample code...

Our handy sample app should be updated as well. Again, clear out local changes, and update, but this time we'll also need to switch branches.

git checkout .
git pull
git fetch
git checkout mt_coroutines

This will check out the remote mt_coroutines branch. We've moved the coroutines sample to a separate branch because of the local kotlinx.coroutines dependency. It would fail if you hadn't done the local build.

Anyway, now we're ready to get started.

Just an FYI. This post is not a generalized intro to coroutines. We're assuming you understand the basics. We're going to discuss them in the context of native and native's state model only.

Overview

Some things to know specific to coroutines on native:

1) A coroutine is always bound to a single thread. The JVM may use a thread pool for processing. Not so in native.

2) You create a new threaded coroutine with newSingleThreadContext. Each is backed by it's own Worker (and therefore, it's own thread).

3) Dispatchers.Default is defined, and is a single Worker coroutine.

4) Dispatchers.Main is defined for Apple targets. The situation is more complex for Windows/Linux.

See the kotlin-native-sharing.md for more detail.

You switch threads by switching the Dispatcher. In our examples we'll do this with withContext.

Something very important to understand:

If you are switching threads, all data captured will be frozen, and any return value will be frozen.

Let's look at a basic example. In SampleMacos.kt, find 5) Coroutines. Run basicBackgroundCoroutine().

fun basicBackgroundCoroutine() = runBlocking {
    println("(Coroutine) Is main thread $isMainThread")
    withContext(Dispatchers.Default) {
        println("(Coroutine) Is main thread $isMainThread")
    }
}

We create a coroutine context with runBlocking, then switch to a different thread with withContext(Dispatchers.Default). On the console we should see:

(Coroutine) Is main thread true
(Coroutine) Is main thread false

Easy!

Coroutines aren't magic. They follow the same state rules in native as everybody else. As mentioned, any captured state is frozen before it changes threads.

Run captureStateCoroutine()

fun captureStateCoroutine() = runBlocking {
    val sd = SomeData("Hello ๐Ÿฅถ", 67)
    println("(Coroutine) Am I frozen? ${sd.isFrozen}")
    withContext(Dispatchers.Default) {
        println("(Coroutine) Am I frozen now? ${sd.isFrozen}")
    }
}

This is very similar to captureState() we ran in Part 2. When sd moves threads, it's frozen. We should see:

(Coroutine) Am I frozen? false
(Coroutine) Am I frozen now? true

Coroutines follow the same rules, and also have the same potential issues. Run captureTooMuchCoroutine()

fun captureTooMuchCoroutine() = runBlocking {
    val model = CountingModelCoroutine()
    model.increment()
    println("I have ${model.count}")

    model.increment()
    println("I have ${model.count}") //We won't get here
}

class CountingModelCoroutine {
    var count = 0

    suspend fun increment() {
        count++
        withContext(Dispatchers.Default) {
            saveToDb(count)
        }
    }

    private fun saveToDb(arg: Int) {
        //Do some db stuff
    }
}

Just like in captureTooMuch() from Part 2, the whole model gets frozen. How do we fix it? Surprise! Same solution. Run captureArgsCoroutine():

fun captureArgsCoroutine() = runBlocking {
    val model = CountingModelSaferCoroutine()
    model.increment()
    println("I have ${model.count}")

    model.increment()
    println("I have ${model.count}")
}

class CountingModelSaferCoroutine {
    var count = 0

    suspend fun increment() {
        count++
        saveToDb(count)
    }

    private suspend fun saveToDb(arg: Int) = withContext(Dispatchers.Default) {
        println("Doing db stuff with $arg, in main $isMainThread")
    }
}

Return Values

Unlike our simple background method, coroutines can return a method to your suspended caller. As mentioned, if the value being returned is crossing a thread, it'll be frozen. Run returnDataCoroutine():

fun returnDataCoroutine() = runBlocking {
    val sd = SomeData("Hello ๐Ÿถ", 67)

    val result = makeData(sd)

    println("result: $result, is frozen ${result.isFrozen}")
}

private suspend fun makeData(sdIn: SomeData) = withContext(Dispatchers.Default) {
    SomeData("Hello again ๐Ÿถ", sdIn.i + 55)
}

You should see:

result: SomeData(s=Hello again ๐Ÿถ, i=122), is frozen true

When makeData is called, the main thread suspends while makeData runs in the background. When complete, the result is frozen, passed back to the caller and returned.

As mentioned, multithreaded kotlinx.coroutines is in preview, which makes it's use in production apps somewhat restricted today. However, if you want to play with things a bit more, and see Flow in action, check out the mt_coroutines branch from:

GitHub logo touchlab / DroidconKotlin

Kotlin Multiplatfom app for Droidcon Events

CoroutineWorker

CoroutineWorker from Autodesk is a library intended to let you use coroutines in KN across threads with syntax and semantics similar to what you'll use when kotlinx.coroutines with multiple threads is live.

GitHub logo Autodesk / coroutineworker

Kotlin Coroutine-based workers for native

Once again, let's revert and switch branches:

git checkout .
git pull
git fetch
git checkout coroutine_worker

You'll notice a very similar set of functions. In fact, everything we described above works basically the same. State captured is frozen, state returned is frozen. You call with withContext, although the dispatcher you pass is only used on the jvm.

Find 5) Coroutine Worker and run those methods in succession to verify they act the same.

For example, run captureStateCoroutine(). It is very similar to what we did with the MT coroutines preview, and behaves in a similar way.

fun captureStateCoroutine() = runBlocking {
    val sd = SomeData("Hello ๐Ÿฅถ", 67)
    println("(Coroutine) Am I frozen? ${sd.isFrozen}")
    withContext(Dispatchers.Default) {
        println("(Coroutine) Am I frozen now? ${sd.isFrozen}")
    }
}
(Coroutine) Am I frozen? false
(Coroutine) Am I frozen now? true

We know of multiple teams using Coroutine Worker in production right now, and if the concurrency provided is sufficient, it's a great way to be source-ready for the full kotlinx.coroutines release.

Reaktive

Reaktive from Badoo is a reactive streams implementation for KMP, which includes KN support. If you are more of an RX fan vs Coroutines (if you get into that kind of debate), this is something you'll want to take a look at. We haven't done much since it was in preview, but will be taking a deeper look as part of our concurrency library review.

Just FYI, there's currently no macos build for Reaktive, so our sample needs to be run through Xcode.

GitHub logo badoo / Reaktive

Kotlin multi-platform implementation of Reactive Extensions

Let's reset our sample again:

git checkout .
git pull
git fetch
git checkout reaktive

The code should look reasonably familiar to somebody with RxJava experience. Let's do a basic sample that loads data in an I/O thread and returns it to the UI.

fun basicObservable(){
    observable<SomeData> { emitter ->
        println("From io thread, is main thread? ${isMainThread}")
        emitter.onNext(SomeData("arst", 43))
    }
        .subscribeOn(ioScheduler)
        .observeOn(mainScheduler)
        .threadLocal()
        .doOnBeforeNext { values += it } // Callback is not frozen, we can updated the mutable list
        .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
        .subscribe {
            println("In main thread $isMainThread, is data frozen ${it.isFrozen}")
        }
}

We emit an instance of SomeData in the I/O thread and it winds up in the subscribe block. Data that chains through threads is frozen, as by now you'd probably expect.

To see this in action, uncomment basicObservable(), but to run it we need to do something different that before. Refresh gradle, and run the build task. This will take some time to run. After that, open iosApp/iosApp.xcworkspace in Xcode. Make sure it's iosApp.xcworkspace and not iosApp.xcodeproj. Once that is open, pick a simulator and run the project.

You should see the following output:

From io thread, is main thread? false
In main thread true, is data frozen true

Like with other libraries, you should be careful with state capture. Reaktive lives in the same state universe as everything else. Captured state will be frozen. Run willFreeze():


fun willFreeze(){
    val sd = SomeData("arst", 43)
    println("Frozen here? ${sd.isFrozen}")
    observable<SomeData> { emitter ->
        println("How about here? ${sd.isFrozen}")
        emitter.onNext(sd)
    }
        .subscribeOn(ioScheduler)
        .observeOn(mainScheduler)
        .threadLocal()
        .doOnBeforeNext { values += it } // Callback is not frozen, we can updated the mutable list
        .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
        .subscribe {
            println("Obviously frozen here? ${sd.isFrozen}")
        }
}

On the output you'll see:

Frozen here? false
How about here? true
Obviously frozen here? true

We're not going to go super deep on Reaktive in this post, but it's definitely an interesting project for concurrency.

What's next?

This series is trying to avoid going too deep on the details and just explain enough for you to get started. If you want to go deeper and really understand what's happening, here are some other resources.

I gave a talk this year at Kotlinconf. It covers a lot of the same content, but also goes into worker and some info on architectural thinking.

Nikolay gave a talk at last year's Kotlinconf that goes into much deeper detail.

I wrote an earlier post series on the same topic. It is getting a bit old at this point, but it also explains deeper level topics.

Finally, the KN repo has some reference docs available. CONCURRENCY.md and IMMUTABILITY.md.

As mentioned in Part 1, this series of docs is part of our KMP evaluation kit. The goal of the starter kit is to kickstart your team's KMP evaluation. You can sign up for access to that here: https://go.touchlab.co/kampdevto

Top comments (1)

Collapse
 
nikolamilovic profile image
Nikola Milovic

Hey Kevin, keep these going, this has been the most informative article Ive read on the topic. Just one question, why is the state frozen in Reaktive WillFrezee method in the observable block?