loading...
Cover image for Working with Kotlin Coroutines and RxSwift
Touchlab

Working with Kotlin Coroutines and RxSwift

russhwolf profile image Russell Wolf ・8 min read

A recent client engagement involved interop between Kotlin coroutines in shared code, and RxSwift on the iOS side. We did some work to ensure that this could be done in a way that is type-safe and thread-safe. Whether or not you use RxSwift, hopefully this can provide some useful patterns for interop code.

Coroutines are a Kotlin language feature that allows asynchronous code to be written in a way that looks like synchronous code, avoiding the nesting that often comes with callback-based APIs. They're available on all Kotlin platforms, but have some limitations on the native side. The release version of native coroutines is limited to single-threaded use-cases, though there are experimental releases available that are multithreaded with some risk of memory leaks. But while they're a fully-supported language feature of Kotlin, they don't translate to Objective-C and Swift.

RxSwift is a Swift implementation of the Reactive Streams specification. It's one way to handle asynchronous code on Swift, and has many operators for combining and transforming event streams. Though some of the names are different, much of the API will feel familiar to Kotlin developers who have experience with RxJava.

Since Coroutines will almost always be present in shared code, and RxSwift is a common option on the iOS side, hopefully the motivation to communicate between them is clear. So let’s start writing some code.

Common Repository

We’ll work with a dummy repository class that looks like this, defined in src/commonMain

class ThingRepository {
    suspend fun getThing(succeed: Boolean): Thing {
        delay(100)
        if (succeed) {
            return Thing(0)
        } else {
            error("oh no!")
        }
    }

    fun getThingStream(count: Int, succeed: Boolean): Flow<Thing> = flow {
        repeat(count) {
            delay(100)
            emit(Thing(it))
        }
        if (!succeed) error("oops!")
    }
}

From the outside this looks roughly like a real repository might, but with inputs that let us control the output a bit more directly for demonstration purposes. It gives us the ability to test success and error cases for both single-event suspend functions, and multiple-event Flows. The internal delays also give us a chance to cancel things in-flight so we can verify that works as expected.

Of course, if we try to consume this repository from Swift code, we run into some immediate problems. Suspend functions are not visible at all to Swift*, and while the Flow type is visible, most APIs you might call on it are not. Further, Flow's generic type is not visible to Swift, due to limitations in how generics are translated from Kotlin, though Objective-C, and then to Swift. Generic arguments only make it from Kotlin to Swift when defined on classes, and Flow is an interface.

*This will be changing somewhat in Kotlin 1.4, where suspend functions will generate a callback function visible to Objective-C/Swift. However, this uses only the language-level suspend function support, and none of the extra functionality from the kotlinx.coroutines library, so it doesn't help with cancellation or the structured concurrency model. We want to be able to control cancellation and threading via CoroutineScopes, so the new stuff in 1.4 won't help here.

Suspend and Flow wrappers

So, let's add some wrapper types that are easier to interact with from Swift. These are iOS-specific, so we can put them in the iosMain source-set. First, SuspendWrapper:

class SuspendWrapper<T>(private val suspender: suspend () -> T) {
    fun subscribe(
        scope: CoroutineScope,
        onSuccess: (item: T) -> Unit,
        onThrow: (error: Throwable) -> Unit
    ): Job = scope.launch {
        try {
            onSuccess(suspender())
        } catch (error: Throwable) {
            onThrow(error)
        }
    }
}

A couple things to highlight here:

  • The subscribe function is a stand-in for invoking the lambda since that's impossible from Swift, but it also adds some callbacks. This lets us easily hook into the different Rx event callbacks. If we care about other parts of the Rx lifecycle like subscription and disposal we could add other lambdas here and call them at the appropriate time.
  • There's a scope parameter, which I left in to keep this architecture-agnostic. You could alternatively make the scope internal to the wrapper, if you want to assert that you always call this suspend function in a specific scope. This would be less flexible but would avoid some work on the Swift side.
  • The generic parameter has an Any upper-bound. Without this, all Swift references to T would be nullable, even if a nonnull type were used for T. If you need to work with nullable types, I recommend having a separate NullableSuspendWrapper which differs only in the generic upper-bound.

But there's still an issue. There's a good chance we'll want to talk to these wrappers in a multithreaded setting. In the default, single-threaded coroutines version, this would require the success and throw callbacks to switch back to the original thread before running, which is a heavy limitation. Things are easier if we use the multithreaded coroutines branch, but we still need to do some work to make this thread-safe for Kotlin/Native.

class SuspendWrapper<T>(private val suspender: suspend () -> T) {
    init {
        freeze()
    }

    fun subscribe(
        scope: CoroutineScope,
        onSuccess: (item: T) -> Unit,
        onThrow: (error: Throwable) -> Unit
    ): Job = scope.launch {
        try {
            onSuccess(suspender().freeze())
        } catch (error: Throwable) {
            onThrow(error.freeze())
        }
    }.freeze()
}

We've added freeze() calls in three key places:

  • On initialization, so the SuspendWrapper itself can cross threads
  • On the arguments of each callback, so that emissions or errors can be passed to other threads.
  • On the Job returned by the subscribe() call, so that it can be cancelled from any thread.

The Flow equivalent is this:

class FlowWrapper<T>(private val flow: Flow<T>) {
    init {
        freeze()
    }

    fun subscribe(
        scope: CoroutineScope,
        onEach: (item: T) -> Unit,
        onComplete: () -> Unit,
        onThrow: (error: Throwable) -> Unit
    ): Job = flow
        .onEach { onEach(it.freeze()) }
        .catch { onThrow(it.freeze()) }
        .onCompletion { onComplete() }
        .launchIn(scope)
        .freeze()
}

This is similar to the structure of SuspendWrapper, except that it wraps a Flow instead of a suspend function. An important but subtle detail is that the catch{} call must come before onCompletion{}, or else the RxSwift stream will end before the error is emitted.

iOS Repository Wrapper

With these function-level wrappers in place, we'll add an iOS wrapper around our ThingRepository that makes use of them:

class ThingRepositoryIos(private val repository: ThingRepository) {
    val scope: CoroutineScope = object : CoroutineScope {
        override val coroutineContext: CoroutineContext
            get() = SupervisorJob() + Dispatchers.Default
    }

    init {
        freeze()
    }

    fun getThingWrapper(succeed: Boolean) = 
        SuspendWrapper { repository.getThing(succeed) }
    fun getThingStreamWrapper(count: Int, succeed: Boolean) =
        FlowWrapper(repository.getThingStream(count, succeed))
}

This piece is the unfortunate boilerplate we need in this scheme. While most of the other bridge code just has to be written once, each repository method must be explicitly wrapped into something visible from Swift, and we wrap the entire repository so we have a place to put them. At scale it would probably be preferable to codegen some version of this, but for something small it's pretty straightforward to write by hand.

Note that we freeze this class to ensure it can safely be touched from a background thread, since it likely will be.

We also declare a scope here. This will be passed as a parameter to the subscribe() calls on SuspendWrapper and FlowWrapper. In this example it's using Dispatchers.Default, which requires using the multithreaded coroutines branch.

Connecting Wrappers to Rx

Armed with our iOS-compatible repository, it's time to move over into Swift. My natural inclination as a Kotlin developer is to add extension functions to SuspendWrapper and FlowWrapper which connect the relevant Rx piping to the callback lambdas inside the subscribe() function. Unfortunately, the following declaration isn't valid here:

extension SuspendWrapper {
    func toSingle() -> Single<T> {
        ...
    }
}

This leads to an error stating Extension of a generic Objective-C class cannot access the class's generic parameters at runtime. I don't have a deep understanding here, but essentially due to the different generic models between Swift and Objective-C, the generic parameter on SuspendWrapper is erased at runtime and so the extension can't be resolved. Luckily, we can still define top-level functions:

func createSingle<T>(
    scope: Kotlinx_coroutines_coreCoroutineScope,
    suspendWrapper: SuspendWrapper<T>
) -> Single<T> {
    return Single<T>.create { single in
        let job: Kotlinx_coroutines_coreJob = suspendWrapper.subscribe(
            scope: scope,
            onSuccess: { item in single(.success(item)) },
            onThrow: { error in single(.error(KotlinError(error))) }
        )
        return Disposables.create { job.cancel(cause: nil) }
    }
}

func createObservable<T>(
    scope: Kotlinx_coroutines_coreCoroutineScope,
    flowWrapper: FlowWrapper<T>
) -> Observable<T> {
    return Observable<T>.create { observer in
        let job: Kotlinx_coroutines_coreJob = flowWrapper.subscribe(
            scope: scope,
            onEach: { item in observer.on(.next(item)) },
            onComplete: { observer.on(.completed) },
            onThrow: { error in observer.on(.error(KotlinError(error))) }
        )
        return Disposables.create { job.cancel(cause: nil) }
    }
}

These functions take in a CoroutineScope and a SuspendWrapper or FlowWrapper, and create the associated Rx type Single or Observable. Rx events get forwarded to the callback lambdas that the wrappers defined, and the Rx disposable is linked to the coroutine Job so that cancellation on the Rx side shuts down the coroutine as well.

By the way, a note on errors

You might note the KotlinError class referenced here. This is a simple wrapper to pass Kotlin exceptions into the Rx error stream, and forward the error message.

class KotlinError: LocalizedError {
    let throwable: KotlinThrowable
    init(_ throwable: KotlinThrowable) {
        self.throwable = throwable
    }
    var errorDescription: String? {
        get { throwable.message }
    }
}

This lets us read out the Kotlin error message from Swift by reading the localizedDescription field on the error, without needing to check its type. This helps avoid extra Kotlin-specific error-handling in Swift.

Usage From Elsewhere in iOS App

The rest of the Swift code can now talk to these observables, like so:

let disposable = createObservable(
    scope: repository.scope,
    flowWrapper: repository.getThingStreamWrapper(count: 3, succeed: true)
).subscribe(
    onNext: { thing in NSLog("next: \(thing)") },
    onError: { error in NSLog("error: \(error.localizedDescription)") },
    onCompleted: { NSLog("complete") },
    onDisposed: { NSLog("disposed") }
)

This will print

next: Thing(count=0)
next: Thing(count=1)
next: Thing(count=2)
complete!
disposed!

So there you have it: type-safe and thread-safe communication from Swift with coroutine APIs defined in our shared Kotlin, via RxSwift.

Next Steps

I didn't emphasize it very heavily above, but only the Swift side of this setup has any knowledge of RxSwift. So if you want to do something similar with some other reactive API (I hear Combine is interesting), you could probably do so without needing to change anything on the Kotlin side.

If you'd like to explore further, you can find the code at the following repo:

This includes a few extensions on what was presented here:

  1. Both nullable and nonnull versions of all Kotlin wrapper types and Swift wrapper functions
  2. Swift unit tests to verify everything is working
  3. An extra jobCallback parameter in wrapSingle() and its siblings so that the Swift tests can pull the internally-created Job out and verify it gets cancelled when expected.

Hopefully this tour of RxSwift and Coroutines interop was interesting and helpful to you. And if you'd like to go deeper, you can always reach out to Touchlab!

Posted on by:

Touchlab

We are the Kotlin Multiplatform experts. We partner with mobile engineering leaders to accelerate feature development, maximize efficiency, and future-proof teams.

Discussion

markdown guide