This post series assumes familiarity with Kotlin, Java, and Spring Boot.
No AI was used during the writing of this post series.
Coroutines are an all-in feature
In Part 3 we finally got to a fully working solution, covering all the edge cases. However, we went to some trouble to get there and we had to be extra careful not to miss some of those edge cases.
Some of this trouble is due to the fact we were mixing blocking and non-blocking code. Coroutines are by their nature non-blocking and they are invasive. This means that they tend to spread through the codebase. You mark one function with suspend and then you have to mark all the callers of that function with suspend, and then all the callers of all those functions, and it propagates all the way.
This is why coroutines are an all-in feature. They mostly make sense if the entire codebase is non-blocking and is using coroutines.
All we wanted was 2 calls in parallel
In Part 1, all we wanted to achieve was to execute 2 functions concurrently instead of sequentially. Since we are using Kotlin, coroutines were a natural solution, but it proved to be more complex than we initially expected. Luckily, we have an alternative.
StructuredTaskScope
StructuredTaskScope is a class from JDK whose main purpose is to enable/facilitate structured concurrency in Java. Before this, achieving structured concurrency in Java was nearly impossible, unless we want to use a reactive framework, which gets way more complicated and frustrating than using kotlin coroutines.
Kotlin has full interoperability with Java, so we should have no problem using this API from Kotlin.
Seeing it in action
After replacing coroutines with the new API, here's what we get:
@RestController
@RequestMapping("/books")
class BooksController(
private val booksRepository: BooksRepository,
private val reviewsService: ReviewsService,
) {
private val log = LoggerFactory.getLogger(BooksController::class.java)
@GetMapping("/{id}")
fun fetchBook(@PathVariable id: Long): Book = taskScope {
log.info("[START] $id")
val book = fork { findBook(id) }
val reviews = fork { fetchReviewsFor(id) }
join()
book.get()
.copy(reviews = reviews.get())
.also { log.info("[SUCCESS] Returning ${it.id}") }
}
private fun findBook(id: Long) =
booksRepository.findById(id) ?: throw BookNotFoundException(id)
private fun fetchReviewsFor(id: Long) =
reviewsService.fetchReviewsFor(id)
}
It looks very similar to what we had before. Here's what changed:
-
runBlockingwas replaced bytaskScope -
asyncwas replaced byfork - We have to call the
joinmethod after we fork all the subtasks -
findBookandfetchReviewsForare no longer suspending and wrapped inrunInterruptible
And if we call the endpoint, it runs in about 1s and produces following logs:
16:46:59.955 [tomcat-handler-0] [requestId=ipr] INFO c.b.b.BooksController - [START] 3
16:46:59.956 [virtual-57] [] INFO c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
16:46:59.957 [virtual-56] [] INFO c.b.b.BooksRepository - [START] Fetching book with id 3
16:47:00.963 [virtual-57] [] INFO c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
16:47:01.011 [virtual-56] [] INFO c.b.b.BooksRepository - [DONE] Fetched book with id 3
16:47:01.011 [tomcat-handler-0] [requestId=ipr] INFO c.b.b.BooksController - [SUCCESS] Returning 3
We see that requestId again disappears in some logs, but we'll deal with that later. What's important for now is that we've covered the happy path
Sad path
Changing application.properties
books-repository.delay-in-seconds=3
reviews-service.delay-in-seconds=1
reviews-service.fail=true
And then calling the endpoint produces following logs:
16:51:56.180 [tomcat-handler-0] [requestId=peg] INFO c.b.b.BooksController - [START] 3
16:51:56.181 [virtual-57] [] INFO c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
16:51:56.182 [virtual-56] [] INFO c.b.b.BooksRepository - [START] Fetching book with id 3
16:51:57.209 [tomcat-handler-0] [requestId=peg] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.util.concurrent.StructuredTaskScope$FailedException: java.lang.RuntimeException: Something went wrong when fetching reviews] with root cause
java.lang.RuntimeException: Something went wrong when fetching reviews...
This means that the cancellation also works as expected.
That was easy, right?
Well, not really.
taskScope doesn't really exist in JDK or in the Kotlin standard library.
We had to jump through some hoops to make it to work smoothly.
Hoops
If we used the API as it is provided in the JDK, our code would look like this:
@GetMapping("/{id}")
fun fetchBook(@PathVariable id: Long): Book = StructuredTaskScope.open<Any>().use {
log.info("[START] $id")
val book = it.fork(Callable { findBook(id) })
val reviews = it.fork(Callable { fetchReviewsFor(id) })
it.join()
book.get()
.copy(reviews = reviews.get())
.also { log.info("[SUCCESS] Returning ${it.id}") }
}
Several problems with this:
-
StructuredTaskScope.open<Any>().use {is too long - We have to use
Anybecause the type parameter foropenrepresents the result type of subtasks and our 2 subtasks have different result types. One returns aBookand the other returnsList<Review>. - We cannot call fork like this:
it.fork { findBook(id) }. This is because in Java, theforkmethod has two overrides. One withCallableand the other withRunnableand in Kotlin it defaults toRunnablewhich hasvoidas a result type. That's why we have to be explicit aboutCallable. There is even an open issue on YouTrack about this: Prioritize StructuredTaskScope.fork(Callable) over StructuredTaskScope.fork(Runnable)
Kotlin flexibility at its best
To be able to use taskScope { we need only a few lines of helper/wrapper code:
class TaskScope<T, R>(private val scope: StructuredTaskScope<T, R>) : AutoCloseable by scope {
fun <A : T> fork(fn: () -> A): StructuredTaskScope.Subtask<A> = scope.fork(Callable { fn() })
fun join(): R = scope.join()
}
inline fun <T, R> taskScope(block: TaskScope<T, Void>.() -> R): R =
TaskScope(StructuredTaskScope.open<T>()).use { it.block() }
This solves all of the above mention problems:
-
taskScope {is much shorter thanStructuredTaskScope.open<Any>().use { - We don't have to use
Anyexplicitly - We don't have to specify
Callable, we simply call it like this:fork { ... }
What about that lost requestId?
Looking back at the logs:
17:15:46.571 [tomcat-handler-0] [requestId=uey] INFO c.b.b.BooksController - [START] 3
17:15:46.573 [virtual-57] [] INFO c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
17:15:46.573 [virtual-56] [] INFO c.b.b.BooksRepository - [START] Fetching book with id 3
17:15:47.575 [virtual-57] [] INFO c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
17:15:47.611 [virtual-56] [] INFO c.b.b.BooksRepository - [DONE] Fetched book with id 3
17:15:47.611 [tomcat-handler-0] [requestId=uey] INFO c.b.b.BooksController - [SUCCESS] Returning 3
We see that requestId is lost in the middle.
This is because MDC is using ThreadLocal under the hood and ThreadLocal values are not propagated when fork is called.
Since we already created a wrapper class TaskScope, adding this propagation shouldn't be too difficult:
class TaskScope<T, R>(private val scope: StructuredTaskScope<T, R>) : AutoCloseable by scope {
fun <A : T> fork(fn: () -> A): StructuredTaskScope.Subtask<A> {
val mdcMap = MDC.getCopyOfContextMap()
return scope.fork(Callable {
MDC.setContextMap(mdcMap)
fn()
})
}
fun join(): R = scope.join()
}
This solves our problem:
17:26:42.703 [tomcat-handler-0] [requestId=lef] INFO c.b.b.BooksController - [START] 3
17:26:42.704 [virtual-57] [requestId=lef] INFO c.b.r.ReviewsService - [START] Fetching reviews for book with id 3
17:26:42.705 [virtual-56] [requestId=lef] INFO c.b.b.BooksRepository - [START] Fetching book with id 3
17:26:43.711 [virtual-57] [requestId=lef] INFO c.b.r.ReviewsService - [DONE] Fetched reviews for book with id 3
17:26:43.759 [virtual-56] [requestId=lef] INFO c.b.b.BooksRepository - [DONE] Fetched book with id 3
17:26:43.760 [tomcat-handler-0] [requestId=lef] INFO c.b.b.BooksController - [SUCCESS] Returning 3
But, this feels a bit hacky. A more idiomatic way to achieve the same thing would be to use ScopedValue, but that's a story for another time.
CoroutineScope vs StructuredTaskScope: Which one to choose?
Find out in the next part...
Top comments (0)