I work at Infobip on a product for marketing automation, aka customer engagement. In a nutshell it looks like this: you define a so called campaign with elements like "Send SMS", "Call API", "Wait for incoming SMS" and then various actions happen when people interact with it.
Data structure we use is a graph of nodes with transitions between them, a state machine. Then we have the code that executes it on top of various RabbitMQ queues for each type of node.
Everything went smooth until the product got relaunched with more features added as a specifically customer engagement solution. Big clients started to bring in more traffic and every other night we got alert that somebody launched multi-million audience campaign in Brazil and that all the queues are full and nothing else gets processed.
All the clients would get delays in their traffic processing and they'd, start calling our support. After that, we'd usually spend the night panicking and trying to fix it.
What is RabbitMQ?
Now, before I start boasting on how me and my team saved the world and solved all those problems (spoiler: we didn't yet), let's make a step back and look into some basics of what RabbitMQ is and which concepts it uses.
Official documentation pretty much nailed it but still I'll mention a few concepts.
RabbitMQ is a message broker. You push messages from one end, it stores them, and pushes to whoever expressed such desire on the other end. Unlike Kafka it's a "smart" broker. While you read from a Queue, you push to an Exchange which may or may not have some logic on where to route your messages.
In a simple case you push to a direct exchange with routing key abc
and it get's delivered on a queue named abc
.
Another common use case is dead lettering: if a broker failed to deliver a message it puts it on a deadletter exchange, which in our case configured to route it to deadletter queue. So if message on queue abc
wasn't delivered it won't block others but instead will lay still on some deadletter-abc
waiting for devs to figure out what when wrong.
How do you consume those messages? Your app opens a TCP connection and inside it a lot of other let's-say-virtual connections which are called Channels. Each channel gets a tag which allows to identify whatever Rabbit pushes towards you (yes, Rabbit pushes, it's not you pulling it).
… or so we thought until all the mess I'm going to describe next hit us.
A Big Fish problem
To dismantle this puzzle let's start with the basics. The easiest way for a Java developer to do something meaningful with RabbitMQ is to use official Java RabbitMQ client library.
Here's how it would look like
On the inside, library executes all your consumers in batches of workload on a single thread pool per connection. Since most of the time you'd have just 1 cached connection - it's not the best idea to do blocking work in the consumer.
It'd be nicer to have another thread pool which we can control and put all our blocking jobs there… And that's exactly what Spring AMQP does. That and all the Spring voodoo magic wrapping you and I are so used to: metrics, transactions, you name it.
Meet SimpleMesssageListeningContainer
(SMLC for short), a pillar of Spring AMQP work. Inside it here is a thread loop plus a bunch of consumers implementing plain Rabbit MQ interface as we've seen above.
We use Kotlin and Spring stack, so producing a message means
…while consuming is as simple as…
How to set your priorites and what do BigBoss Corporation and croissants have to do with it?
So here comes the juice.
Let's imagine we'd like to process traffic on top of Rabbit queue for two clients: international BigBoss Corporation and MollyPies local bakery shops.
To do this we'll define "change-events" queue to handle events. BigBoss Corp immediately sends 10 million recipients campaign. MollyPies sends message to 1000 recipients so they know that fresh croissants are ready to pick up.
However Molly's customers will get their croissants cold, because the 1000 messages will get lost among 10mil sent by BigBoss.
We head to Internet and learn that in similar systems like Kafka you can have multiple partitions and route your messages to them based on the key. While Rabbit doesn't have the same mechanics, it can be emulated.
We define "change-events-1" and "change-events-2" in hope that if BigBoss and Molly messages are on separate queues everything will go as planned.
That's the approach we had. Oh how wrong we were.
Yes, RabbitMQ does push messages in round-robin fashion. But it does so not by queues, but by channels. So in our example we had 100 channels (aka 100 threads), each subscribed to every queue for some type of element. We did so because you can't write @RabbitListener
100 times with just different queue number at the end. Then Rabbit would push to each of those threads in round-robin, but from all the queues they subscribed to in no particular order. Therefore when somebody overloads change-events-1
- , all threads start working hard to consume and process it's messages and don't care about others.
After some head scratching we realized that the gotcha is in the threads, not queues. If we make separate thread pool for each queue - everything will work as expected:
Here two SMLC containers are created, one per annotation. Each has it's own thread pool to process messages and since resources are now equal - Molly is not in trouble.
Yet, we have not two but some thousands of customers. It'd be not so efficient to create separate thread pool for each of them.
Can we do better?
As long as our consuming speed is higher than producing - we can go away with very first option with just one plain queue. What we actually trying to fight is BigBoss sending 10 millions of messages at once when normally we send say 10k in the same time period.
If we could somehow throttle this batch - everything would be fine.
RabbitMQ supports priority queues. The idea is to approximate consumption speed per client and then on producer side make priority lower for those which sends too fast.
Roughly it'd look like this:
With this approach BigBoss first starts to produce as fast as we can. Over time it's speed quickly surpasses average and messages sit in the queue with lower priority. Given that our consuming speed is enough - eventually consumers will process them.
One transaction to rule them all
Initially we've configured transaction management like this:
Some of our consumer logic was more time-critical than other. Since we already used Spring transactional support for our data access - we added the transaction timeout like this:
First thing we discovered - only a few transaction managers support timeouts and RabbitTransactionManager
is not one of them.
Still, even JPA one didn't apply our timeout setting. After debugging, it became evident that SMLC uses wrong transaction definition. One more hack we've added to our forked version was to find proper @Transaction
via reflection and read timeout value from it.
However that was not the end of transactions pain. We had code like this in one of the consumers:
e.condition
would evaluate to true in ~5% of cases. However, it turned out that such code would always create a database transaction upon entering the process method and therefore would (totally unnecessary) request connections. Since we had connection pool configured for a relatively low size and didn't expect much use of it - it came as a surprise when this method got stuck due to connection pool exhaustion.
In the end we weighted pros and cons and decided to turn off RabbitMQ transactions completely.
Retries
Once upon a time an unoptimized SQL caused our DB to throw lock exceptions in about 10% of certain queries. Named query was executed by one of consumers. That was actually not a big deal because we had retries in place:
Yet, when looking at the latency graph, one thing bothered us - p99 was suspiciously round number like 45s or so. Turns out Spring Retry support for Spring AMQP is pretty much as it is for other cases:
Thread.sleep(backOffPeriod)
When you have limited number of threads to process something and part of them are doing, well, just nothing - it's annoying. So we turned to another popular approach, using so called "Topic" exchanges and TTL mechanism on RabbitMQ itself. You declare a bunch of queues named retry.xxxx
where xxxx are intervals of your exponential backoff. Then each of them has the same as TTL property and default exchange (the one where you normally produce) as dead letter.
Then you configure StatefulRetryOperationsInterceptor
with MessageRecoverer
which sends failed message to proper retry.xxxx
queue based on retry attempt #.
Hence message is sitting on the broker instead of blocking application thread.
A lesson learned - RabbitMQ works
Despite all those quirks we found one remains true for us: RabbitMQ itself worked brilliantly under rather heavy load, it sustained network partitions and, didn't loose data in the queues when we configured it to do so.
We managed to implement fair queuing solution on top of it and at the same time keep our business logic clean. Congrats to us!
Top comments (0)