The first part of the Kafka Streams API blog series covered stateless functions such as filter, map etc. In this part, we will explore stateful operations in the Kafka Streams DSL API. It focuses on aggregation operations such as aggregate, count, reduce along with a discussion of related concepts.
Aggregation
Aggregation operation is applied to records of the same key. Kafka Streams supports the following aggregations - aggregate, count, reduce. As mentioned in the previous blog, grouping is a pre-requisite for aggregation. You can run groupBy (or its variations) on a KStream or a KTable which results in a KGroupedStream and KGroupedTable respectively.
KTablegrouping was not covered in the stateless operations blog
aggregate
The aggregate function has two key components - Initializer and Aggregator. When the first record is received, the Initializer is invoked and it's used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set - it is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Here is an example of how you can calculate the count i.e. number of times a specific key was received
code examples are available on GitHub
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
count
count is such a commonly used form of aggregation that it is offered as a first class operation. Once you have the stream records grouped by key (KGroupedStream) you can count the number of records of a specific key by using this operation.
The aggregate way of doing things can be replaced by a single method call!
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count();
reduce
You can use reduce to combine the stream of values. The aggregate operation which was covered earlier is a generalized form of reduce. You can implement functionality such as sum, min, max etc. Here is an example of max
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));
stream.groupByKey()
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long currentMax, Long v) {
Long max = (currentMax > v) ? currentMax : v;
return max;
}
}).toStream().to(OUTPUT_TOPIC);
return builder.build();
Note that all the aggregation operations ignore records with
nullkey which is obvious since the very goal of these set of functions is to operate on records of a specific key
Aggregation and state stores
In the above examples, the aggregated values were pushed to an output topic - this is not mandatory though. It is possible to store the aggregation results in local state stores. Here is an example:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));
In the above example, the call to count also creates a local state store named count-store which can then be introspected using Interactive Queries.
These state stores can either be in-memory or stored on disk using RocksDB. This allows for scalability since each state store is present locally in the specific Kafka Streams application which processes inputs from different partitions of a topic - thus, the overall state is distributed across (potential) multiple instances of your application (except in case of GlobalKTables). Another key property is high availability because the contents of these state stores are backed up into Kafka as changelog aka compacted topics (although this can be disabled) which provides high availability - if an app instance crashes, it's state store contents can be restored from Kafka itself
KGroupedTable
A KGroupedTable is obtained when groupBy* operations are invoked on a KTable. Just like KGroupedStream, having a KGroupedTable is a pre-requisite for applying aggregation on a KTable. aggregate, count and reduce work the same way in KGroupedTable as they do with a KGrou`pedStream. But, there is an important difference that needs to be highlighted.
A KTable is conceptually different from a KStream in the sense that it represents a snapshot of the data at a point in time (very much like a database table). It is a mutable entity as opposed to a KStream which represents an immutable + infinite sequence of records. To factor this difference, the aggregate and reduce functions in a KGroupedTable also add an additional Aggregator (often known as a subtractor) and it is invoked when a key is updated or a null value is obtained.
Windowing
Stateful Kafka Streams operations also support Windowing. This allows you to scope your stream processing pipelines to a specific time window/range e.g. track no. of link clicks per minute or no. of unique page views per hour
To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream (as explained above) using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping or sliding) or session-based time windows
Using windowedBy(Windows<W> windows) on a KGroupedStream returns a TimeWindowedKStream on top of which you can invoke the above mentioned aggregate operations. For e.g. if you want number of clicks over a specific time range (say 5 mins), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries i.e. clicks from user1 from 10 AM to 10:05 AM will be aggregated (counted) separately and a new time block (window) starts from 10:06 AM during which the clicks counter is reset to zero and counted again
`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);
TimeWindowedKStream windowed = stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5)));
windowed.count().toStream().to(OUTPUT_TOPIC);
`
Other window types include:
-
Tumblingtime windows which never overlap i.e. a record will only be part of a one window... - ... in contrast to
Hoppingtime windows where records can be present in one or more time range/window -
Slidingtime windows are meant for use with Joining operations
There is another type of stateful operation which is
Joining. It is an extensive topic which deserves an entire post (or maybe another series?) by itself
If you want to take into account the "session" i.e. period of activity separated by a defined gap of inactivity, please use windowedBy(SessionWindows windows) which returns a SessionWindowedKStream.
`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.toStream().to(OUTPUT_TOPIC);
return builder.build();
`
That's all for this part of Kafka Streams blog series. Stay tuned for the next part which will demonstrate how to test Kafka Streams applications using the in-built test utilities.
References
Please don't forget to check out the following resources for Kafka Streams
Top comments (0)