When a new version for a product has to be released, it has always been key as it's a critical moment for engineering teams as well as for operation teams; mainly because it's usually managed as a big bang switch over. So during this operation, if something bad happens like a migration procedure that is not running as expected, or the system is not working like it should be, whether it's a functional issue or a performance one, it puts a lot of pressure on the teams because it has a direct impact on the business.
Canary release, or sometimes called friends and family, is a more and more common practice when a new version of a software is rolled out. It consists in keeping both versions running side by side and gradually increasing the traffic managed by the new version during the sunset phase of the old one. Then it becomes way easier to check that no major bug is impacting the new version or to verify that it's able to deal with the real life traffic. Long story short, whatever the problem can be faced, the chances that it impacts the business are drastically dropping down.
The term Canary Release comes from the time when people were working underground in the mines and the risk was if the level of CO2 in the air increased, as it's scentless, miners died silently. To cover that risk, they used to work with a canary bird in a cage and if the canary dies, it means that the level of CO2 increased dangerously, as this kind of bird is even more sensitive than humans to CO2.
When thinking about how to implement that for a REST API or a web application, the usual way is to use a reverse proxy to balance the traffic across the two versions, but when considering data streaming, how to implement that with data processors while maintaining all guarantees that Kafka offers? This is the challenge I faced and this paper is to share my proposition to tackle it.
Let's put some basic requirements
First of all, it shouldn't have any impact on the design or on the code neither of the producer nor of the consumer. Then the producer shouldn't be aware that the traffic is split down to two different processors for the same purpose, which is a common concept between loose coupled applications. Finally, it must guarantee that every record is processed, and processed by only one consumer.
Scenario
Let's consider that you want to introduce a new version and at first it will only deal with 10% of the traffic. Usual verifications are applied: no excessive consumer lag, no functional regressions or bug. Then you plan to gradually increase the traffic to be applied and at the final stage, completely switch to the new version, allowing you to dispose the old one.
Using ksqlDB as a router
It is the basic idea. As the traffic needs to be split in two based on a ratio, we can leverage the RANDOM that returns a value between 0.0 and 1.0.
create or replace stream original
(original_key <key_type> key)
with (...);
create or replace stream legacy_version
with(...) as
select * from original where random() > 0.1 ;
create or replace stream new_version
with(...) as
select * from original where random() <= 0.1 ;
Please notice that the Schema Registry utilization offers more concise definitions. As the outcome for a persistent query is one and only one topic, then you need two persistent queries.
Not that fast
That way the amount of traffic processed by both versions is compliant with the ratio defined, however, due to the nature of the random
function, there's neither the guarantee that every record is processed nor that each one is processed only once. In order to work around that, we need to set in stone the assigned ratio to each record and then apply the routing:
create or replace stream original
(original_key <key_type> key)
with (...);
create or replace stream original_rated
with(...) as
select *, random() as rate from original;
create or replace stream legacy_version
with(...) as
select * from original_rated where rate > 0.1 ;
create or replace stream new_version
with(...) as
select * from original_rated where rate <= 0.1 ;
In order to start the canary release transition, the former version is stopped, then the queries above are started and the two versions of the service are started, reconfigured to consume the assigned topics.
Looks good, but that's not enough.
The default ksqlDB behavior when running a query is to read the topic from the latest offset. That means that the sequence above is leading to data loss as it's quite sure that messages are pushed to the topic while running the transition from the normal to the canary release. As a consequence, the rating query is required to start consuming the input topic from the last offsets committed by the legacy service before it stopped. Unfortunately, ksqlDB doesn't offer the capability to define starting offsets for a given query. The only options are the value accepted in the query configuration parameter auto.offset.reset: earliest
or earliest
. But the game is not yet over as the language gives access to the offset and partition of each record, thanks to the pseudo columns.
So the procedure requires additional steps: after the former version is stopped, for each partition, collect the committed offset for the consumer group assigned to the legacy and build the rating query accordingly in order to start consuming only unprocessed messages, as an example:
SET 'auto.offset.reset'='earliest';
create or replace stream original (...);
create or replace stream original_rated with(...) as
select *, random() as rate from original
where
(ROWPARTITION=0 and ROWOFFSET > 165) OR
(ROWPARTITION=3 and ROWOFFSET > 176) OR
(ROWPARTITION=1 and ROWOFFSET > 149) OR
(ROWPARTITION=5 and ROWOFFSET > 151) OR
(ROWPARTITION=2 and ROWOFFSET > 152) OR
(ROWPARTITION=4 and ROWOFFSET > 167) ;
It does the trick but also comes with the drawback that it requires to stream the entire content of the input topic. If the amount of data stored in the partitions is huge, it can take a non-negligible time.
Way to go!
Now both versions can be started. Please notice that all those operations don't have any impact on the data producer and there's no impact at all on the applications, the only requirement is to provide the consumed topic as a configuration parameter.
Then, over time, the routing ratio can be revised running the following sequence :
- Pause the rating query, not drop, otherwise offsets will be lost
- Update the ratio from the two routing queries
- Resume the rating query
Final stage: promote the new version
For obvious reasons, even if you assign 100% of the traffic to the new version, all of that can't stay like that forever as it's a waste of storage (the records are copied twice in three topics) as well as a waste of processing resources (each canary release implies three persistent queries). So a final procedure is required to safely reconfigure the new application to process messages from the original input topic:
- Pause the rating query
- Wait til the consumer groups of the two downstream services have zero lag on all partitions
- Collect the offsets for the consumer groups of the two applications
- For each partition: compute the offset the new version should start from in the original topic considering adding the last offset of the former version in the original partition + the offset of the legacy in the filtered rated partition + the offset of the new version in its respective partition; reset the offset for this partition group to this resulting offset
- Start the new version configured to consume the original topic and dispose everything else, that's it!
Automate it!
Now that every step is detailed, why not build some tooling to make it automatic? This is what I did in Canary Router. It's based on a couple of shell scripts and comes with minimal requirements. It mainly leverages Confluent Cloud resources, so the only requirement is to sign up, you'll be awarded of 400$ of free credits to spin up streaming resources, which is more than enough for a basic Kafka cluster and a small ksqlDB cluster. It's definitely not production ready, but it shows a path for running that kind of operation, and can be easily reimplemented in Java or Go to make it a proper tool.
The demo is based on a Datagen source connector and the downstream services are nothing more than containers running dumb kafka-console-consumer
. If you want to test it with real life consumers, just implement the (start|stop)_(legacy|new)_service
four shell functions to pass as a context to the scripts, like the one implemented in context.sh.
Remarks and limitation
There's a script that checks that the number of messages in the new/legacy topics are consistent with the fact there's no duplicate and no data loss, however it's weak as the only reliable way would be to check every message and not just count.
This design works as long as the processing order is not a requirement. Even if there's no doubt that the messages are copied in the same partition numbers in the different topics, there's no guarantee that all messages with the same key are in the same topic. Then especially as the traffic is by design not evenly balanced, there will definitely be at some point some records that will be processed not in the order they were produced. That being said, I think a logic can be implemented in order to enforce that once a key becomes assigned to the new service, then all records will be sent to this topic. Feel free to comment or fork the repository and propose a pull request 😉.
Top comments (0)