DEV Community

Cover image for Structured Concurrency in Practice: CoroutineScope vs StructuredTaskScope [Part 4]
Filip Egeric
Filip Egeric

Posted on

Structured Concurrency in Practice: CoroutineScope vs StructuredTaskScope [Part 4]

This post series assumes familiarity with Kotlin, Java, and Spring Boot.

No AI was used during the writing of this post series.

Coroutines are an all-in feature

In Part 3 we finally got to a fully working solution, covering all the edge cases. However, we went to some trouble to get there and we had to be extra careful not to miss some of those edge cases.

Some of this trouble is due to the fact we were mixing blocking and non-blocking code. Coroutines are by their nature non-blocking and they are invasive. This means that they tend to spread through the codebase. You mark one function with suspend and then you have to mark all the callers of that function with suspend, and then all the callers of all those functions, and it propagates all the way.

This is why coroutines are an all-in feature. They mostly make sense if the entire codebase is non-blocking and is using coroutines.

All we wanted was 2 calls in parallel

In Part 1, all we wanted to achieve was to execute 2 functions concurrently instead of sequentially. Since we are using Kotlin, coroutines were a natural solution, but it proved to be more complex than we initially expected. Luckily, we have an alternative.

StructuredTaskScope

StructuredTaskScope is a class from JDK whose main purpose is to enable/facilitate structured concurrency in Java. Before this, achieving structured concurrency in Java was nearly impossible, unless we want to use a reactive framework, which gets way more complicated and frustrating than using kotlin coroutines.

Kotlin has full interoperability with Java, so we should have no problem using this API from Kotlin.

Seeing it in action

After replacing coroutines with the new API, here's what we get:

@RestController
@RequestMapping("/books")
class BooksController(
    private val booksRepository: BooksRepository,
    private val reviewsService: ReviewsService,
) {
    private val log = LoggerFactory.getLogger(BooksController::class.java)

    @GetMapping("/{id}")
    fun fetchBook(@PathVariable id: Long): Book = taskScope {
        log.info("[START] $id")

        val book = fork { findBook(id) }
        val reviews = fork { fetchReviewsFor(id) }

        join()

        book.get()
            .copy(reviews = reviews.get())
            .also { log.info("[SUCCESS] Returning ${it.id}") }
    }

    private fun findBook(id: Long) =
        booksRepository.findById(id) ?: throw BookNotFoundException(id)

    private fun fetchReviewsFor(id: Long) =
        reviewsService.fetchReviewsFor(id)
}
Enter fullscreen mode Exit fullscreen mode

It looks very similar to what we had before. Here's what changed:

  1. runBlocking was replaced by taskScope
  2. async was replaced by fork
  3. We have to call the join method after we fork all the subtasks
  4. findBook and fetchReviewsFor are no longer suspending and wrapped in runInterruptible

And if we call the endpoint, it runs in about 1s and produces following logs:

16:46:59.955 [tomcat-handler-0] [requestId=ipr] INFO  c.b.b.BooksController - [START] 3
16:46:59.956 [virtual-57] [] INFO  c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
16:46:59.957 [virtual-56] [] INFO  c.b.b.BooksRepository - [START] Fetching book with id 3
16:47:00.963 [virtual-57] [] INFO  c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
16:47:01.011 [virtual-56] [] INFO  c.b.b.BooksRepository - [DONE] Fetched book with id 3
16:47:01.011 [tomcat-handler-0] [requestId=ipr] INFO  c.b.b.BooksController - [SUCCESS] Returning 3
Enter fullscreen mode Exit fullscreen mode

We see that requestId again disappears in some logs, but we'll deal with that later. What's important for now is that we've covered the happy path

Sad path

Changing application.properties

books-repository.delay-in-seconds=3
reviews-service.delay-in-seconds=1
reviews-service.fail=true
Enter fullscreen mode Exit fullscreen mode

And then calling the endpoint produces following logs:

16:51:56.180 [tomcat-handler-0] [requestId=peg] INFO  c.b.b.BooksController - [START] 3
16:51:56.181 [virtual-57] [] INFO  c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
16:51:56.182 [virtual-56] [] INFO  c.b.b.BooksRepository - [START] Fetching book with id 3
16:51:57.209 [tomcat-handler-0] [requestId=peg] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.util.concurrent.StructuredTaskScope$FailedException: java.lang.RuntimeException: Something went wrong when fetching reviews] with root cause
java.lang.RuntimeException: Something went wrong when fetching reviews...
Enter fullscreen mode Exit fullscreen mode

This means that the cancellation also works as expected.

That was easy, right?

Well, not really.
taskScope doesn't really exist in JDK or in the Kotlin standard library.

We had to jump through some hoops to make it to work smoothly.

Hoops

If we used the API as it is provided in the JDK, our code would look like this:

@GetMapping("/{id}")
fun fetchBook(@PathVariable id: Long): Book = StructuredTaskScope.open<Any>().use {
    log.info("[START] $id")

    val book = it.fork(Callable { findBook(id) })
    val reviews = it.fork(Callable { fetchReviewsFor(id) })

    it.join()

    book.get()
        .copy(reviews = reviews.get())
        .also { log.info("[SUCCESS] Returning ${it.id}") }
}
Enter fullscreen mode Exit fullscreen mode

Several problems with this:

  1. StructuredTaskScope.open<Any>().use { is too long
  2. We have to use Any because the type parameter for open represents the result type of subtasks and our 2 subtasks have different result types. One returns a Book and the other returns List<Review>.
  3. We cannot call fork like this: it.fork { findBook(id) }. This is because in Java, the fork method has two overrides. One with Callable and the other with Runnable and in Kotlin it defaults to Runnable which has void as a result type. That's why we have to be explicit about Callable. There is even an open issue on YouTrack about this: Prioritize StructuredTaskScope.fork(Callable) over StructuredTaskScope.fork(Runnable)

Kotlin flexibility at its best

To be able to use taskScope { we need only a few lines of helper/wrapper code:

class TaskScope<T, R>(private val scope: StructuredTaskScope<T, R>) : AutoCloseable by scope {
    fun <A : T> fork(fn: () -> A): StructuredTaskScope.Subtask<A> = scope.fork(Callable { fn() })
    fun join(): R = scope.join()
}

inline fun <T, R> taskScope(block: TaskScope<T, Void>.() -> R): R =
    TaskScope(StructuredTaskScope.open<T>()).use { it.block() }
Enter fullscreen mode Exit fullscreen mode

This solves all of the above mention problems:

  1. taskScope { is much shorter than StructuredTaskScope.open<Any>().use {
  2. We don't have to use Any explicitly
  3. We don't have to specify Callable, we simply call it like this: fork { ... }

What about that lost requestId?

Looking back at the logs:

17:15:46.571 [tomcat-handler-0] [requestId=uey] INFO  c.b.b.BooksController - [START] 3
17:15:46.573 [virtual-57] [] INFO  c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
17:15:46.573 [virtual-56] [] INFO  c.b.b.BooksRepository - [START] Fetching book with id 3
17:15:47.575 [virtual-57] [] INFO  c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
17:15:47.611 [virtual-56] [] INFO  c.b.b.BooksRepository - [DONE] Fetched book with id 3
17:15:47.611 [tomcat-handler-0] [requestId=uey] INFO  c.b.b.BooksController - [SUCCESS] Returning 3
Enter fullscreen mode Exit fullscreen mode

We see that requestId is lost in the middle.
This is because MDC is using ThreadLocal under the hood and ThreadLocal values are not propagated when fork is called.

Since we already created a wrapper class TaskScope, adding this propagation shouldn't be too difficult:

class TaskScope<T, R>(private val scope: StructuredTaskScope<T, R>) : AutoCloseable by scope {
    fun <A : T> fork(fn: () -> A): StructuredTaskScope.Subtask<A> {
        val mdcMap = MDC.getCopyOfContextMap()
        return scope.fork(Callable { 
            MDC.setContextMap(mdcMap)
            fn() 
        })
    }
    fun join(): R = scope.join()
}
Enter fullscreen mode Exit fullscreen mode

This solves our problem:

17:26:42.703 [tomcat-handler-0] [requestId=lef] INFO  c.b.b.BooksController - [START] 3
17:26:42.704 [virtual-57] [requestId=lef] INFO  c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
17:26:42.705 [virtual-56] [requestId=lef] INFO  c.b.b.BooksRepository - [START] Fetching book with id 3
17:26:43.711 [virtual-57] [requestId=lef] INFO  c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
17:26:43.759 [virtual-56] [requestId=lef] INFO  c.b.b.BooksRepository - [DONE] Fetched book with id 3
17:26:43.760 [tomcat-handler-0] [requestId=lef] INFO  c.b.b.BooksController - [SUCCESS] Returning 3
Enter fullscreen mode Exit fullscreen mode

But, this feels a bit hacky. A more idiomatic way to achieve the same thing would be to use ScopedValue, but that's a story for another time.

CoroutineScope vs StructuredTaskScope: Which one to choose?

Find out in the next part...

Top comments (0)