Enriching an event with data from another source is one of the more common use cases in event streaming. But where does the extra enrichment information for the event come from ? In Kafka Streams it could be easily written like this:
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("my_topic")
.mapValues(order -> enrichRecord(order, findCustomerById(order.getCusutomerId())))
.to("enriched_records");
Or in ksqlDB with a User Defined Function :
@UdfDescription(name = "find_customer_by_id",
author = "Brice ",
version = "1.0.2",
description = "Finds a Customer entity based on its id.")
public class FindCustomerByIdUdf {
@Udf(schema="...")
public Struct findCustomerById(@UdfParameter int customerId) {
[...]
return customer;
}
}
used like that in ksqlDB
:
create stream enriched_records with(kafka_topic="enriched_records", ...) as
select
order_id,
[...]
find_customer_by_id(customer_id)
from orders emit changes;
You can replace the database query with an external call of any kind: REST API request, lookup in a file, etc.
It compiles, it works and it looks like what we've been doing forever, so what’s the problem in doing this?
First of all, there’s a semantic issue because stream processing is expected to be idempotent, meaning that processing again and again the same stream of events should produce the same values, unless you change the implementation of the application, obviously… And involving a third party in order to provide data to enrich your stream breaks this property, because there’s no guarantee that the external call gives the same value each time you invoke it with the same arguments.
Then let’s talk about the architecture concerns. Kafka is a distributed system. Dealing with failures is part of its DNA and there are multiple architecture patterns in order to face almost any kind of outage. This is why Kafka is the first class choice as a central nervous system for many organizations. If you put in the middle of your pipeline a dependency to an external datastore that doesn’t provide the same guarantees, then the resilience and the performance of your application are now the ones offered by this foreign system… And it’s not uncommon to fetch data from a traditional RDBMS, don’t get me wrong. Those are really good tools providing great features but not with the same guarantee, and when it’s not available the whole pipeline is down, ruining your efforts to provide a resilient streaming platform.
My next point against this kind of design is when the external call produces a side effect (meaning each call creates or updates foreign data). In addition to the former point, it breaks the Exactly Once Semantics feature offered out-of-the-box by ksqlDB and Kafka Streams (and to vanilla Kafka client at the cost of some boilerplate) because in case of a failure of any kind during the processing of a record, there’s no means to automatically rollback changes in the remote system. Let’s illustrate it with a practical scenario: imagine the remote request increments a counter and during operations, one of the ksqlDB workers becomes unreachable for any reason. Then the workload is rebalanced to the survival instances and the last uncommitted batch of records is processed once again, meaning there are also unexpected increments in the foreign system. Hashtag data corruption.
This is a well known issue of lack of distributed transaction management… but lack may not be the right term because this is not something that’s expected to be implemented. Indeed, in the past there were options, like XA, to deal with distributed transactions, but it was really cumbersome to set up, and it provided real scalability concerns by design. So this is definitely not what you expect when building a data streaming platform able to process GB of records per seconds!
So how to sort this out?
Usually data enrichment is nothing more than data lookup and record merging, so the best way to do that is onboarding that data in Kafka topics and putting a table abstraction on top of it. More details about that concept in this blog post, then joining the stream of events to this table in order to merge the records. The good thing about this is that the external datastore is no longer interrogated, therefore this point of failure is now fixed. Even if the remote system is unavailable, it won’t have any effect on the pipeline.
And this is something that can be translated in ksqlDB to (considering co partitioning) :
create stream enriched_records with(kafka_topic="enriched_records_by_customer_id", ...) as
select
o.order_id,
[...]
from orders o join customers c on o.customer_id = c.customer_id
emit changes;
The options to make this data available in a topic are multiple: if the remote system is an application already onboarded in Kafka, then it can be updated to stream changes in the destination topic. If it’s a database or a legacy system not expected to share records in Kafka, then you can utilize source connectors such as Change Data Capture or JDBC connector.
What if the remote system is out of my organisation?
This happens when you have to deal with a partner API or any kind of remote system under the control of another business unit, so it’s not possible to onboard this data in Kafka. So it looks like you’re doomed to do the call in the stream processor… Well, not that fast because there’s another concern, a bit more technical but that you should not pass over. And to understand we have to go deeper in the layers down to the Kafka client library. At the end of the day, processing a stream of records is nothing more than implementing a kind of loop:
final Consumer<String, DataRecord> consumer = new KafkaConsumer<String, DataRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, DataRecord> records = consumer.poll(100);
for (ConsumerRecord<String, DataRecord> record : records) {
// Doing stuff
[...]
}
}
Whether you’re writing ksql queries or Kafka Streams Java code, it will result in that kind of poll loop. The Kafka Java client library comes with the following configuration properties:
-
max.poll.interval.ms
: The maximum delay between invocations of poll() when using consumer group management.[…] If poll() is not called before expiration of this timeout, then the consumer is considered failed and the consumer group coordinator will trigger a rebalance in order to reassign the partitions to another member. -
max.poll.records
: The maximum number of records returned in a single call to poll().
Now let’s say that the remote system slows down for any reason and each query/request has a one second response time. The default value for max.poll.records
is 500, so it means that one iteration in the poll loop can take up to 500 seconds… And the default value for max.poll.interval.ms
is 300000, so what will happen in this context is that the GroupCoordinator
will consider the client as down and trigger a rebalance. And your Kafka Streams application (or ksqlDB persistent query) is not down, so the batch of records won’t be committed and after the rebalance, the same records will be processed again and again, continuously increasing the consumer lag. This can lead to a snowball effect because the root cause of that is a slow remote system and because it’s slow, it's invoked more and more without any chance to recover…
Don’t think about it as theoretical concerns, because it’s something I’ve seen on the field!
The cheapest answer is to tune the values of max.poll.records
or max.poll.interval.ms
, which could be fine to adapt to usual latency and response time, but it can be risky to push the limit to deal to casual spikes because this can lead to a vicious circle.
What about using an asynchronous client to avoid blocking the poll loop thread? This design doesn’t work at all because KafkaConsumer
is not thread safe. It’s not a lack of thread safety, it’s enforced by the Kafka processing model because otherwise you lose the ordering guarantee.
So is there any viable option?
Yes there is, at the cost of a less straightforward design… The basic idea is to split the process in two, delegate the request processing to another component that can take advantage of an asynchronous design:
Here, requests are records stored in a topic, consumed by a dedicated processor that runs the request asynchronously to avoid blocking the poll loop and that will eventually write the result in an output topic. Then the second part of the initial pipeline is able to move forward by joining results with pending jobs.
Wait a minute, it sounds exactly like what was described as irrelevant in a Kafka context, isn’t it? Not exactly because as long as the result of the request is not required to commit the request’s topic offsets, that’s ok. On the other hand, it requires to implement at a higher level things like timeouts, maximum in-flight requests and crash recovery.
It obviously increases the complexity, however it offers a real opportunity to implement rich error management scenarios like retries with various back off policies.
You can check out an implementation example of that kind of architecture here. It’s not battle tested but it can give you inspiration for your own needs.
Top comments (0)