1. Introduction
A coroutine is a concept that has been around since at least 1958. It was brought up by Melvin Conway
, and it essentially means that we can create coroutines from one thread and execute them. Coroutines
can be paused and resumed like a Thread, but the most important difference is that while a Thread creates its own context, coroutines use a Thread context and can simultaneously run along with others. It is not technically viewed as parallel runs. Instead, they are seen as just free runs. Coroutines
can also switch context while running and use another Thread context. This is known as cooperative multitasking. Although this is a very old concept, it is currently being very relevant in terms of how we make reactive programming. The reason being is that, by allowing coroutines to make use of contexts and switch from one to another at the same time, then the system is allowed to perform better optimizations while running. This in turn means more efficient use of memory and CPU.
Teams from Kotlinx
have been busy re-introducing this paradigm since about 4 years ago.
Before jumping into the example as we normally do, I’d like to have a discussion about the concepts we are going to have a look at. The way we build an application has to consider concepts like scalability, resiliency, reactivity, capacity, and performance, just to name a few.
Coroutines
are quite fantastic, but they aren’t the holy grail to everything. So we need to be aware that when we make the decision to use them, that we know what we are talking about.
In the project that we are going to see I tried my best to not invent the wheel, but I also had to make sure that there was enough logic in order to explain important concepts of using coroutines in a real-world project. For now, let’s shortly analyze what we want to investigate and what to expect.
2. One solution
When thinking about scalability
, we want to make sure that our application can support the elasticity of our system to be able to support any extension of our original resources. We should be able to increase memory and CPU
without having to make changes to our application. Otherwise, our application is then considered to be rigid and quite difficult to maintain. A container allows us to define how many resources we can give to an application.
Nowadays, we use containers for this and a few application tricks to maximize the efficiency of the user of our resources. Reactive
programming helps a lot with this, and we’ll see how that combined with coroutines can achieve even better results.
For an application like the one we are going to see, we need to provide a way to scale up the traffic we can support. There is only so much traffic we can support per container. Usually, one of the great solutions in the market to achieve a better up-scaling of our systems is to use Load Balancer
s. The great thing about them is that not only they can work to distribute the load, but if one of the instances of our application fails, it can switch immediately to the next available instance. By doing this, we ensure that the probability that our application becomes unavailable gets reduced quite a lot, up to the point of it being completely insignificant. In this way, we get close to a highly available application. These are usually referred to as High Availability
Load Balancers
.
Another thing we face immediately after thinking about this is that we get limited this way if we want to use cache. Introducing cache means that we introduce state and if we introduce state, it means that all our replicas need to share that same state. This is where distributed cache systems like Hazelcast and Reddis come into play. If we create one instance of a cache per machine and allow it to tie itself to a cluster composed of all cache instances, we are guaranteed to share that state across all instances for every update. Just like with load-balancers, we can fine-tune Distributed Cache Systems
to our everyday needs.
In the same way, we have application load balancers, we can also have database load balancers. In this case, we can design a cluster of read-only databases and a cluster of write-only databases. If we achieve a complete separation of write and read operations, it becomes much easier to load-balance the requests to the database and make sure we avoid inconsistencies. We can achieve a CQRS
(Command Query Request Segregation
) like implementation.
In our case, we’ll also see that we need high availability at some point. Although reactive programming with coroutines and R2DBC reactive repositories does help with making our application more reactive and in that way becomes available much more rapidly, it is not, as mentioned before, the way to solve all issues. If we receive a massive amount of requests, a reactive system will still face capacity issues. Namely, the number of ongoing processes can rapidly spike, and so we want to make sure that we do not process anything in some cases. These are the cases where we just want to register a request and let the system deal with them in the background. We are still making our application reactive, but the user will get a response different from a typical ok
. Instead, the user gets a response that the request is ongoing.
3. The Case Explained
Before we jump into the diagram, let’s first have a look at our goal. Want to create a voting system to support the VMA’s
(MTV’s Video Music Awards). Any sort of VMA
’s. Awards like this one are always different and every year, categories may change. The year 2020 saw, for example, awards related to an epidemic for the first time. We want to be able to make the award categories as dynamic as possible. For the MVP
(Minimum Viable Product) or POC (Proof of Concept), whatever we can make ASAP
, we want voters to be able to vote for their favorite artist or song in distinct categories. For our requirements list, we want further:
- The votes should be counted as they come, and they should be registered per userId on a table.
- Votes can only be registered once per user.
- The exception handling, in this case, is not important as long as related exceptions appear in the logs.
- Artists and Songs may get an extra vote, but this vote will only be given to the artist within the category as a whole regardless of the user giving the vote.
- The extra vote will be given according to the Optimistic Locking algorithm. In practical terms, to get the extra vote, a coroutine will read the data from the database, add one vote and try to save it. If a save from another user has occurred in between, then an exception should be thrown.
- At the end of the voting process or even during the process, the total amount of votes should be counted as the sum of the number of rows in the voting table and the successful extra votes given.
- The winner should be easy to check on a results page
- In order for the PO (Product Owner) to understand the background, a front end page should be built to give the feeling of what would happen per user
- Security is not necessary for the demo
- High Availability is required for the demo. No vote should be lost except the unsuccessful extra votes.
- The whole system should support high rates of voting. The final product is set to work worldwide. Some of these requirements may be difficult to understand or read at first, just like any first list of requirements we get, but it is important to have this context. It is on the basis of this particular voting system and the way it works that we’ll implement our solution.
4. The first sketch and diagram
In order to understand what we are going to implement, let’s first have a look at the following diagram and let that sink in for a few minutes. Have a look in detail over everything that’s described and sketched. We’ll then section each architectural piece and dive into it from left to right.
From left to right, we can see that we need 4 sections for our application. The groups I’ve created are based on 4 elements necessary for our system.
As mentioned above, we need an application and a front-end facing application. We achieve this in our example using an Angular
application and two reactive services. NGINX
serves the angular application and load-balances requests to both reactive application replicas. We will see further an extra element added called Hazelcast
which is a very convenient and fast way to create a distributed cache cluster. This comprises the Application section.
To register our votes, but not process them, we need some sort of streaming framework. I have chosen, in this case, to use Kafka. Not only that but in this case, we’ll also use the Avro
serializer. We’ll see further how is this used and how potentially this can help at the time we want to serialize objects through the stream. This is the Streaming section.
In the Listeners
section, we will use 2 spring boot reactive services. The reactive processes are not working as a cluster. Instead, they are just two consumers of the streams. It helps to have 2. This way we can process our requests on the streams faster, and it provides a fail-safe
mechanism.
Finally, in order to provide the Database Cluster section and purely because we want to run all of this locally, we’ll be making use of Patroni in order to allow the startup of our databases to join two PostgreSQL
clusters via Etcd.
5. Focus on the Sections
As we have seen above, we have different sections and they are very detailed in their implementation. With the overview, we have seen all the pieces involved in making this architecture. Now we’ll have a look in detail at the implementations. We’ll see code, how it’s implemented and what it provides in a nutshell. All the players of this project are available via port mapping to the outside, but we are only interested in sending requests via port 8080, which then gets rerouted by NGINX
to the adequate services.
It is also very important to understand that although this article comprises a lot of technologies, the point is to show some applications using reactive programming with coroutines. Hence, why it is also very important to mention the libraries we are going to use.
For coroutines
, we are using:
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core-jvm</artifactId>
</dependency>
For Kafka
, we are using:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
For the Avro
serializer we are using:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
For Hazelcast
we are using:
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
There are further all the libraries needed to run Spring Boot
, repositories, the rest methods and services, and others. These are all standards, and they are off the scope of this article.
5.1. Application Section
In the application we are going to see a front-end application in Angular. This application is directly served via NGINX. Further, we have NGINX configured to load balance between two other Spring boot processes. This is done by implementing NGINX in the following way:
server {
listen 8080;
listen [::]:8080;
root /usr/share/nginx/html;
server_name _;
location / {
root /usr/share/nginx/html;
try_files $uri $uri/ /index.html?$args;
}
location /result {
root /usr/share/nginx/html;
try_files $uri $uri/ /index.html?$args;
}
location /api/vma/ {
proxy_pass http://apps;
}
location /api/vma/broker {
proxy_pass http://apps;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $http_host;
proxy_set_header Access-Control-Allow-Origin 192.168.0.120;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-NginX-Proxy true;
}
}
In this implementation we see a few important configurations. All root traffic is redirected to /usr/share/nginx
folder. That’s where all the code of our application will reside. For all API routes, we use a proxy_pass which is responsible to divert the traffic to http//:apps
. This URL is not really a plain URL. Instead, it is recognized by NGINX as an upstream. We have two rerouting paths. One reroutes /api/vma
and the other /api/vma/broke
r. The first redirects traffic to one instance for the REST service calls. The latter reroutes inbound and outbound to the instances websockets. This is how we get the application constantly updated in the front-end:
http {
upstream apps {
server 192.168.0.21:8081;
server 192.168.0.22:8082;
}
...
}
Now that we understand how load-balancing is working on an application level with NGINX
, let’s now have a look at how coroutines are actually working. Before continuing though, it is probably a good idea to review what coroutines really are and what they are really not. Coroutines aren’t threads. They use and can share thread contexts.
The diagram above isn’t directly related with the code. What I’m trying to show is that coroutines work by using a coroutine context which is given by the thread we decide it should run in. We can start a coroutine programmatically with launch and decide a context for it to run on the first argument. Frequently we want to decide which thread is important to make this work. We can also decide a new context to run a block of code in with, as an example withContext. Contexts are immutable though. This means that we can’t change the context the coroutine originally run. We can make it switch context, but the coroutine is still dependent and cancellable from the original context it was created from.
Our application has a lot of REST
methods implemented in a reactive way. Voting is done using POST
requests which then make sure that the votes find their way into the Kafka
streams. We also have a websocket broker, which as mentioned before, makes sure our application gets updated realtime. Explaining all of these methods and the complete logic behind the implemented voting system isn’t the goal of this article. We want to understand how coroutines work and so let’s get started.
A good example to start is the method getCurrentVma
in the RegistryController
class:
@GetMapping("/current")
fun getCurrentVma(@CookieValue("votingId") votingKey: String?): Flow<CategoryDto> =
categoryService.findAll(votingKey)
In this example we are using the coroutines Flow class. This Flow is an abstraction to implement a flowing stream of CategoryDto. This is a Data Transfer Object
(DTO
), which represents the categories of the award. It contains the name of the category, the list of candidates, the winner after voting is complete, and if it has been voted off by a certain user. In any case, it provides a continuous flow. It collects the incoming data using emitters. If we look at the original source code we see:
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
What is important here to realize is that emit is a suspend function abstraction. The single use of suspend means that the code will be running as a coroutine having the current thread as the thread context. In our case, we make these coroutines in the flow run unconfined in the main Thread context.
A good example to see a coroutine in action is located in the PortController
class. This is the getWelcomeMessage
function:
@GetMapping
suspend fun getWelcomeMessage() =
"Welcome to the VMA Voting System Test App! This one is running on port $port"
We simply return a String. I created this method simply to verify the load-balancing
functionality of NGINX
.
In this application, and as mentioned in the introduction, we need to cache the progress of the voting process. Caching in replicated instances must also be replicated. One instance must know what the other has been given and vice-verses
. The easiest way to do this is to use one Distributed Cache System with auto-discovery. In our example we are using Hazelcast to that. We use the Java
implementation alternative where we don’t even need to start a container. For this, we create a HazelcastInstance
bean on both instances:
@Bean
fun hazelcastInstance(): HazelcastInstance {
val worldConfig = Config()
worldConfig.clusterName = "vma-world"
return Hazelcast.newHazelcastInstance(worldConfig)
}
And then we can use it as in the following example:
private val cache: MutableMap<String, VotingStatus> = hazelcastInstance.getMap("vma-cache")
Finally, we send the data to kafka
via the following method:
suspend fun publishArtistVote(key: String, artistVotingDto: ArtistVotingDto): Mono<Void> {
val producerRecord: ProducerRecord<String, ArtistVotingDto> =
ProducerRecord(kafkaConfigProperties.createArtistVoteRequestTopic, key, artistVotingDto)
return voteArtistRequestRequestKafkaSender.createOutbound()
.send(Mono.just(producerRecord))
.then()
.doOnSuccess { logger.info("Vote Created with id $key") }
}
This is also a suspend method. Even when just placing objects in the Kafka stream we want this to be done in a reactive way. Maybe we won’t see a noticeable difference on a few requests, but when making many requests at the same time, we should see more requests being handle at the same time than otherwise. In the streams we will be using the ArtistVotingDto and SongVotingDto types. We are aso using a Mono as a return value. We are using library reactor-kafka
for all Kafka interactions.
All methods of this project are either suspend or they return a Flow object. Both guarantee that the application is available as much as possible and that it can continue to receive requests as they come. Our application become non-blocking
and the use of the load-balancer guarantees that we get great fault tolerance and very good resilience. The load balancer is de-coupled from the application. Should there be concerns about the availability of the load-balancer itself, we can manage that very easily by introducing backup load-balancers to the system. Since caching is also distributed, then that means that all instances would have to go down in order for the cache to be removed. This means that we can say that we can have fault-tolerance to load-balance failure, application failure and cache failure on the application section.
5.2. Streams Section
In this part of our application we are using 3 different players. One is Zookeeper
on port 2181
, the other is the Schema Registry for our Avro Serializable objects on port 8088
and finally the Kafka broker or Kafka Stream on port 29092
. We create two topics upon the start of the listeners with the designationscreate-artist-vote-request-topic
and create-song-vote-request-topic
. One receives the votes for an artist and the other the votes for a song. The Avro
Serializer isn’t a requirement per se, but I found it interesting to join it. And example of an Avro
object can be found in the common
module
in class
:
data class ArtistVotingDto(
val userId: String,
val idC: String,
val idA: String
) : IndexedRecord {
override fun getSchema(): Schema =
SchemaBuilder.record("ArtistVotingDto")
.namespace("org.jesperancinha.vma.common.dto")
.fields()
.requiredString("userId")
.requiredString("idC")
.requiredString("idA")
.endRecord();
override fun put(i: Int, v: Any?) {
println("$i + $v")
}
override fun get(i: Int) = when (i) {
0 -> userId
1 -> idC
2 -> idA
else -> ""
}
}
This is the way we can create a simple Avro
object. What is good about this, if anything, at a first glance, is that we don’t really need to share the types running in the streams. The params are indexed and send that way through the stream. On the receiving end, we’ll see that we are not really casting. We will end up just making a conversion of the values given. This is very analogous to disassembling a furniture, shipping it in trucks and reassembling on destination. The structure is not maintained, but the values are. We can restructure the metadata and the data at the end. The schema registry service is needed to keep the topic schemas which guarantee consistency in the messages sent.
In the diagram we see shadowed Kafka
brokers. Locally Kafka
consumes a lot of resources, and so I just used one single Kafka broker, but we could and should use more in production.
5.3. Listeners Section
The listeners section is composed of two Spring Boot services. As mentioned before, they are consumers of the kafka streams and by default they belong to the same consumer group. They are running on ports 9001
and port 9002
. This not a necessary requirement. I used these ports to perform a few tests. Once the data is consumed, it is then recreated and persisted to the database. This is where the votes get registered.
Before jumping into how the listener data models work, we first should have a look at how the cluster is implemented. For this cluster I’ve used 5 important players. We need a HAProxy
on port 5000
, an Etdc server on port 2379
, Patroni startup mode applied to three database PostgreSQL
containers on ports 5432
. The Etdc server works as a cluster management. HAProxy has no knowledge of this. The databases connect to Etdc
via Patroni
and the connections to them via HAProxy
get managed by Etdc. If you prefer this way, Etdc is kind of orchestrator of the databases masked by Patroni
.
In the Listener
implementation we create a listener:
@Bean
private fun votingRequestListener(): Disposable {
return KafkaReceiver.create(receiverOptions)
.receive()
.concatMap { record ->
createVoteRequestHandler
.handleCreateVoteRequest(record.value())
.then(record.receiverOffset().commit())
.doOnError {
logger.error(
"Error while creating Vote",
it
)
}
}
.subscribe()
}
For this specific case we are not using a suspend function. We are simply creating a Bean which will run on the background. Since we are using the reactor-kafka
libraries, we need to make sure that out event handlers return Flux
related reactive objects:
fun handleCreateVoteRequest(request: Record): Mono<Job> {
return mono {
request.schema.name.let { name ->
CoroutineScope(IO).launch {
when (name) {
"ArtistVotingDto" -> {
val vote = VoteCategoryArtist(
userId = request.get(0).toString(),
idC = request.get(1).toString(),
idA = request.get(2).toString()
)
votingCategoryArtistRepository.save(vote)
val category = categoryArtistRepository.findByCategoryIdAndArtistId(vote.idC, vote.idA)
categoryArtistRepository.save(
category.copy(
votes = category.votes + 1,
updates = category.updates + 1
)
)
}
else -> {
val vote = VoteCategorySong(
userId = request.get(0).toString(),
idC = request.get(1).toString(),
idS = request.get(2).toString()
)
votingCategorySongRepository.save(vote)
val category = categorySongRepository.findByCategoryIdAndSongId(vote.idC, vote.idS)
categorySongRepository.save(
category.copy(
votes = category.votes + 1,
updates = category.updates + 1
)
)
}
}
}
}
}.doOnError { logger.error("Exception while trying to create a new user", it) }
}
We do not, in principle need to launch a coroutine in the IO thread context in this example. I added it as an example on how we can launch coroutines from a certain context of our choosing. In this case, we are already running our code withing a reactive context in a publisher. In our case, this publisher is Mono<Job>.Job
, is the object created during launch. The following is an example on how we restructure the metadata and the data upon receiving the event:
val vote = VoteCategoryArtist(
userId = request.get(0).toString(),
idC = request.get(1).toString(),
idA = request.get(2).toString()
)
votingCategoryArtistRepository.save(vote)
After receiving the event we also try to give an extra vote:
val category = categoryArtistRepository.findByCategoryIdAndArtistId(vote.idC, vote.idA)
categoryArtistRepository.save(
category.copy(
votes = category.votes + 1,
updates = category.updates + 1
)
)
The reason why this is a try is that the table entity is implemented the following way:
@Table
data class CategoryArtist(
@field: Id
val idCA: String = UUID.randomUUID().toString(),
val idC: String? = null,
val idA: String? = null,
val updates: Int = -1,
val votes: Long = 0,
val voteCount: Long = 0,
@field: Version
val version: Long? = null,
) : Persistable<String> {
override fun getId(): String = idCA
override fun isNew(): Boolean = updates < 0
}
When doing this, we are making sure that no update may happen between the time we read the data and the data we write data to the database on a particular row. This is guaranteed solely by the use of the @field: Version annotation. Doing so, guarantees that the mechanism I’ve mentioned above, optimistic
locking
is active.
Listeners implemented this way are reactive and in our case we are also making use of coroutines and R2DBC
. This means in other terms a tremendous amount of decoupling, which means that all moving parts of our system are quite independent of each other and don’t block each other at all.
5.4. Database Cluster Section
This project wouldn’t be interesting without this final part. This is where we interact with the database. Consulting data from the database, before, during and after voting is extremely important. Systems must be resilient enough to support voting checks and allow users to follow how the voting is going. In a real case we don’t really get to see ongoing voting. Normally a periodic update happens, and we are actually reading the vote aggregation from the website. For the sake of our exercise, let’s imagine that we really want to check the overloaded database sometimes. And we want to make this available not only for ourselves but for all the users worldwide that are using the application at the same time. For this, we need a couple of method. We’ll just examine one for Flux and one for suspend and see how that works in the different layers of the MVC Design Pattern.
We have a GET
method in the code which gives us a complete data needed to populate the VMA voting page, located in the RegistryController
:
@GetMapping("/current")
fun getCurrentVma(@CookieValue("votingId") votingKey: String?): Flow<CategoryDto> =
categoryService.findAll(votingKey)
This method returns a list of CategoryDto. The code for this is:
data class CategoryDto(
val id: String? = null,
val category: String,
val type: CategoryType?,
val capacity: Int,
val artists: List<ArtistDto> = emptyList(),
val songs: List<SongDto> = emptyList(),
val voted: Boolean
)
In this data transfer object, as in many others, we can find a lot of conversion extension functions:
fun CategoryDto.toData(): Category = Category(
name = this.category,
capacity = this.capacity,
type = this.type
)
val CategoryDto.toNewData: Category
get() = Category(
name = this.category,
capacity = this.capacity,
updates = 0,
type = this.type
)
fun Category.toDto(): CategoryDto = CategoryDto(
id = this.id,
category = this.name,
capacity = this.capacity,
type = this.type,
voted = false
)
fun Category.toDtoWithArtistsAndVote(artists: List<ArtistDto>, voted: Boolean): CategoryDto = CategoryDto(
id = this.id,
category = this.name,
capacity = this.capacity,
type = this.type,
artists = artists,
voted = voted
)
fun Category.toDtoWithSongsAndVote(songs: List<SongDto>, voted: Boolean): CategoryDto = CategoryDto(
id = this.id,
category = this.name,
capacity = this.capacity,
type = this.type,
songs = songs,
voted = voted
)
fun Category.toDtoWithArtists(artists: List<ArtistDto>): CategoryDto = CategoryDto(
id = this.id,
category = this.name,
capacity = this.capacity,
type = this.type,
artists = artists,
voted = false
)
fun Category.toDtoWithSongs(songs: List<SongDto>): CategoryDto = CategoryDto(
id = this.id,
category = this.name,
capacity = this.capacity,
type = this.type,
songs = songs,
voted = false
)
A conversion
is not necessarily needed for our example, but it is important to keep in mind that in production environments, the data shown isn’t necessarily always a match with the domain model in the database. We may also find systems that obfuscate Ids. As a general good practice it is always a good idea not to mix the data model with the presentation layer.
Following the lead of the controller method, we find its implementation in the CategoryService
implementation located in the Services.kt
script:
fun findAll(votingKey: String?): Flow<CategoryDto> {
return categoryRepository.findAll().map { category ->
when (category.type) {
ARTIST -> category.toDtoWithArtistsAndVote(
artistService.findAll(
categoryArtistRepository.findByCategoryId(category.id).map { e -> e.idA }.filterNotNull()
.toList()
).toList(), votingKey?.let { cache[votingKey]?.votedOff?.contains(category.id) } ?: false
)
else -> category.toDtoWithSongsAndVote(
songService.findAll(
categorySongRepository.findByCategoryId(category.id).map { e -> e.idS }.filterNotNull().toList()
).toList(), votingKey?.let { cache[votingKey]?.votedOff?.contains(category.id) } ?: false
)
}
}
}
The implementation itself is a bit complicated, because it contains the business logic associated with the state managed by Hazelcast as mentioned before. What’s important to note here is that we are using two repositories and a few extension functions. We are going to focus on the CategoryArtistRepository
:
@Repository
interface CategoryArtistRepository : CoroutineCrudRepository<CategoryArtist, String> {
@Query("Select * from category_artist ca where ca.id_c=:idc")
fun findByCategoryId(@Param("idc") categoryId: String): Flow<CategoryArtist>
@Query("Select * from category_artist ca where ca.id_c=:idc and ca.id_a=:ida")
suspend fun findByCategoryIdAndArtistId(
@Param("idc") categoryId: String,
@Param("ida") artistId: String
): CategoryArtist
}
We can see two declared abstractions which will give us the result of two native queries. For this article, the queries themselves are not important. The result of them are. We talked a bit about emitters and what they mean for the Flux
object. The findByCategoryId
returns a list of CategoryArtist
. Since this a result set of many results, it does not make sense to make this function suspended since the access to each element will be done in a suspended way. And this means that the access is done within coroutines. Function findByCategoryIdAndArtistId
only returns (we hope) one result. Returning one result implies that we should use the keyword suspend. As mentioned before, this keyword immediately makes our function run in a coroutine, thereby complying with reactive programming requirements.
Finally, we should also have a quick look at a method responsible to get the count of the votes for a particular category and artist. It is located in the VotingController
:
@GetMapping("/artist/{idc}/{ida}")
suspend fun getArtistVotingResults(
@PathVariable idc:String,
@PathVariable ida:String
) = votingService.getArtistVotingResults(idc, ida)
This then leads to its implementation in the Services.kt
script in the VotingService
class
:
suspend fun getArtistVotingResults(idc: String, ida: String): Long =
categoryArtistRepository.findByCategoryIdAndArtistId(idc, ida).votes
This is the method we evaluated before. This concludes our lightning run through our project. The following is a complete overview of the whole project when the voting starts:
6. Running the demo
In order to run the demo, please make sure you have Docker
and JDK
17
installed. You can then run this command from the root:
make docker-clean-build-start
Wait a couple of minutes, maybe 10 minutes just to be sure. Then you can run the locust tests:
make locust
In the meantime you can jump to the voting page on http://localhost:8080. There you can choose to vote. The results are seen live on: http://localhost:8080/result
Keep checking the locust process. Once its done, the voting is still not done. You have to let the process run for a bit. Alternatively you can just run:
make count-votes
If you get an exception, it will most likely be because of the optimistic locking algorithm. This is done on purpose for you to see it in action. When the voting is completely stopped, you should be able to run it without errors.
The important logs we should see are the logs from the application and the logs from the listeners. There we should find the exceptions for the optimistic locking implementation and the startup of Hazelcast
:
docker logs jofisaes_vma_listener_1
docker logs jofisaes_vma_backend_img_1
Something like this exception should be seen on the backend:
2021-11-17 16:55:22.810 ERROR 8 --- [io-8081-exec-10] o.a.c.c.C.[.[.[.[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] threw exception
org.springframework.dao.OptimisticLockingFailureException: Failed to update table [category_artist]. Version does not match for row with Id [9cf34b5a-37f9-444c-8518-b778d2fa4a3b].
at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.lambda$doUpdate$14(R2dbcEntityTemplate.java:704) ~[spring-data-r2dbc-1.3.6.jar!/:1.3.6]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:103) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:278) ~[reactor-core-3.4.11.jar!/:3.4.11]
And on the listener something like this:
Exception in thread "DefaultDispatcher-worker-3" org.springframework.dao.OptimisticLockingFailureException: Failed to update table [category_song]. Version does not match for row with Id [8eb3cdde-0771-49b3-a031-9b011cb48ef0].
at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.lambda$doUpdate$14(R2dbcEntityTemplate.java:704)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:103)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:278)
The full stack trace is of no interest for this article, but you’ll be able to see that it is quite extensive.
I’ve created a video to walk through the steps to start the demo. Please have a look at it to see exactly how it works. Since it’s recorded, there should be no known “Demo effects”:
7. Conclusion
I’m hoping that with this very long and extensive article and project I was able to share some magnitude of knowledge that you find interesting and/or you can use in your everyday life as a Software Engineer. My exploration of coroutines have shown me very important things. All things considered, coroutines are a relatively new subject if we think that they are finding their way back to our everyday work. And this is for a very good reason. The observable pattern can be quite difficult and cumbersome to implement. Spring WebFlux does allow for reactive programming implementations, but it also makes implementation a bit more difficult. Coroutines
do cover that. We can think of this as using suspend instead of using Mono<T>
. I do at least. And we can also think this as using Flow<T>
instead of Flux<T>
. There doesn’t seem to be much of a difference programming with flows or fluxes. On a programmatic point of view they seem to be quite similar. Not using Mono<T>
, makes me think of past implementations where I had no other choice but ot use the Zip
function of WebFlux to make sure I would make a non-blocking process that would join the results of different queries.
But the most important part of all of this is actually how coroutines work. By allowing many to run independently on the thread we prefer, we are also allowing multiple different computations to occur at the same time within the same Thread!. This concept makes reactive programming easier and consequently more fun.
This article does not state at all that reactive coroutines are better than WebFlux. It wasn’t the purpose of it to make that statement. There is however the obvious conclusion on a programmatic level that it is just easier to build projects that way. It is also part of the reason I joined reactor-kafka to the mix. Another question we can make is which one of them is more reactive? They both couple really well with R2DBC
implementations and appart from coroutines and its syntax, they can be tested separately. But this is maybe something to explore in another article.
I have placed all the source code of this application on GitHub
I hope that you have enjoyed this article as much as I enjoyed writing it. I tried to keep it small, concise and I left many small details out.
Thanks in advance for reading!
8. Resources
- How to fix the LEADER_NOT_AVAILABLE error in Kafka?
- Using Kotlin Coroutines with Spring
- Going Reactive with Spring, Coroutines and Kotlin Flow
- How to Set Up PostgreSQL Cluster using Patroni on Ubuntu 20.04
- Creating a single HAProxy and two Apache containers with Docker compose
- HAProxy - The Reliable, High Performance TCP/HTTP Load Balancer
- Ingress Gateways
- Amazon Aurora connection management
- PostgreSQL Load Balancing with HAProxy
- PostgreSQL HAProxy: Proxy for HA and Load Balance
- How Does a Database Load Balancer Work?
- HTTP Load Balancing
- Using nginx as HTTP load balancer
- Markdown Badges
- Kotlin coroutines on Android
- Full Kotlin Coroutines Design Reference
- Kotlin Coroutines Design Document
- Guide to UI programming with coroutines
- Coroutine Channels
- Coroutine
- Best practices for coroutines in Android
- Imagining your Repository Layer with Coroutines
- 17.5. Coroutines
- 17.5.2. How Reactive translates to Coroutines?
Top comments (0)