Synchronization in distributed systems is tough. You probably aim to prevent it as much as you can. But sometimes business requirements introduce a need to coordinate different services that have a hard dependency on data freshness.
For the sake of generalization, suppose the architecture is consisting of Service-A, Service-B and Service-C. All of them consume messages from the same Kafka topic but obviously process them differently according to their respective business logic, APIs and SLAs. When Service-A processes a message, it calls the APIs of Service-B and Service-C and expects this message to be included there. This is why Service-A can process a message only after Service-B and Service-C have processed this message successfully. If one of them is down or data is not updated there, the only thing Service-A can do is stop and wait. By other words, there is a hard dependency on Service-B and Service-C from both Availability and Data Freshness aspects.
The architecture challenge is how to synchronize between the Service-A and its dependencies by the most scale-able, cost-effective and simple approach. Yes, I believe that the huber system design principle is complexity reduction. (and if you are not convinced enough, have a look at this great book Philosophy of Software Design which starts with a high-minded explanation of the dangers of complexity in software as well as in system design).
So why did we initially create this problem for ourselves?
Now you probably asking yourself, if Service-A is so sensitive to message order, the simplest (and maybe the only one!) way to guarantee order is performing the processing that is done by Service-B and Service-C as serial steps within Service-A. Why did you distribute them all around?
So no doubt micro (or mini) service architecture comes with its cost, but this architecture style improves the team velocity as every single service is easier to develop, test, deploy and more importantly — scale, maintain and operate. It’s definitely not unique to our organization but personally, I’ve already been convinced and appreciated those benefits by observing the Dev org’s reality. Imagine that Service-A is an AML detection process that consumes payments and evaluates their risk. Service-B is handling entity-relations extracted from the same payments topic and Service-C is the one responsible for aggregating those payments and providing effective time-aware aggregation queries over them. Like AML detection (Service-A), there is also Fraud (let’s name it Service-A1) with completely different business logic but still requires querying over entity-relationship-graph and aggregated profile. We don’t want every team to invest in Time-Series DB and Graph DB technology selection, gain the experience of how those are scaled (or not… ) and operated, instead of focusing on their respective business which is AML or Fraud detection.
Returning back to our initial problem…
If we keep using AML detection as a concrete example of Service-A, how can it make sure that once it calls relationships API (exposed by Service-B) or aggregation API (exposed by Service-C), it’ll get up-to-date results? What if they are temporarily down or slow down for some reason? The detection process can NOT process the message until all required information is available. Progressing with stale info may end up with missing a Money Laundering of millions of dollars!
The Architecture Pattern
The core idea here is leveraging Kafka offset management as a single source of truth for tracking the progress of the various services. The offset is a simple integer that is used by Kafka to maintain the current position of a consumer. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So that the consumer doesn’t get the same record twice because of the current offset. As Kafka manages offset per topic, consumer group and partition, it actually means that each individual record can be identified only by . There are different strategies of how consumers can commit offset to Kafka, here we rely on a commit which is done only after a message is processed successfully by the consumer.
When Service-A polls a message from the topic, it should first extract the partition and offset of this message. Then, it calls Kafka admin API to verify that this offset is already processed by the consumer groups associated with Service-B and Service-C. Kafka admin API is called per each consumer group for retrieving a map of topic partitions to the offset (have a look at listConsumerGroupOffsets function of Kafka admin Java client). The minimal offset per partition presents the offset of the slowest service.
Notice that service may potentially consume from various topics, like Service-C that consume different event types to aggregate, so Service-A should clearly define its dependency as a combination of which technically is translated to a single consumer group.
As long as the minimal offset between Service-B and Service-C is lower than the message offset that is consumed by Service-A, it just waits.
The algorithm in more detail…
Let’s assume a situation where the committed offset of topic T1 is as the following:
Consumer Group CG-A (Service-A):
- partition p1 (C1): 0
- partition p2 (C1): 0
- partition p3 (C2): 0
Consumer Group CG-B (Service-B):
- partition p1 (C1): 30
- partition p2 (C2): 30
- partition p3 (C3): 10
Consumer Group CG-C (Service-C):
- partition p1 (C1): 23
- partition p2 (C2): 52
- partition p3 (C3): 15
As you can see, t1 topic is partitioned into 3 partitions. CG-A and CG-B groups consist of 3 consumers each so every consumer handles a single partition only. But in CG-A, both p1 and p2 are handled by consumer C1.
Every consumer in CG-A should keep an updated partition map. This map can be updated periodically:
for each dependency consumer group // CG-B, CG-C
{
admin.listConsumerGroupOffsets
/// (CG-B -> ( p1 -> 30, p2 -> 30, p3 -> 10)), (CG-C -> ( p1 -> 23, p2 -> 52, p3 -> 15))
update partitionMinOffset map with minimal offset
/// (P1 -> 23), (p2 -> 30), (p3 -> 10)
}
During the ongoing message polling:
for every consumed message /// (sorted from early to latest)
{
set message.offset = extract the offset of the current message
set message.partition = extract the partition of the current message
while (message.offset > partitionMinOffset[message.partition])
wait
process /// only now service-A can process the message
manual commit offset /// commit offset after processing successfully
}
Since the best practice is to prevent a static assignment of partition to a consumer, the consumer can’t assume the list of handled partitions but need to keep all topic partitions offsets as potentially it can handle any one of those partitions at any given point in time.
Assumptions
This approach is heavily based on Kafka's offset for reflecting the actual progress of service. Actually, there are different strategies for how to commit offset to Kafka. We internally use flink-connector-kafka that commits the offset only after checkpoint completion (when OffsetCommitMode is set to ON_CHECKPOINTS). Anyway, regardless of the specific technology, the assumption is that consumers are disabling the default auto-commit but manually commit the offset only after the message has been processed successfully, even in the case of asynchronous operations.
Suggested Alternative
Another approach that has been considered is that Service-B and Service-C will omit the message ID of the processed messages into a dedicated output topic and Service-A will join the output topics and the original topic for getting the full payload. Only once the message ID is consumed from all three topics, Service-A will process the message.
This approach has been rejected due to the following downsides:
Complexity from the Producer side: the fact that Service-B (and Service-C) need to write to its own internal DB as well as to an output topic (and commit the Kafka offset, of course) increases the probability that in some cases those two sinks, of DB and Output topic, will be out of sync.
Complexity from the Consumer side: Service-A needs to join between the original T1 topic and the two output topics and carefully handle a situation where one (or more) of them is very behind. It can NOT crash (due to OOM), but just stop consuming and wait. It’s true that stream processing engines like Flink can handle backpressure by managing a blocking queue per source, but this increases the complexity dramatically.
Massively messages volume: this approach just increases the number of messages significantly as every message will practically be duplicated (or tripled) and can impact the event bus performance and cost.
To Summarize
If you are a Kafka expert, it might be obvious to you but surprisingly enough, this was not so trivial to convince people to get rid of additional output topics as those are basically already captured by Kafka offset itself.
This pattern works perfectly for multiple consumers of the same topic. Actually, it can be extended even for cases where they consume different topics as long as they have all initiated from a single common topic. But this is absolutely more complex pattern and justifies a follow-up article…
Top comments (0)