1. Overview Kafka Streams Basics
Objective:
Learn the fundamental building blocks of Kafka Streams by examining a simple example that:
- Reads messages from a Kafka topic.
- Processes them by grouping and counting based on keys.
- Writes the aggregated results to an output topic.
Key Takeaways:
- Kafka Streams is a powerful client library for building real-time, stream processing applications.
- Keys play a critical role in data partitioning, grouping, and ensuring consistent processing.
- The concepts covered here lay the foundation for more advanced operations such as joins and windowing.
2. Setting Up a Kafka Streams Application
a. Application Configuration
-
Bootstrap Server & Security:
Configure the application with the same security protocols and bootstrap server settings as used for consumers (especially when connecting to managed services like Confluent Cloud).
-
Application ID:
Set an application ID (similar to a consumer group ID) to identify your Kafka Streams application.
Example:
KafkaTutorial.stream.countPickupLocation
-
Auto Offset Reset:
Configure the offset reset policy (commonly set to “latest” so the stream processes only new incoming messages).
-
Cache Configuration:
Set caching (e.g.,
cache.max.bytes.buffering
) to zero during development to ensure immediate visibility of processed messages. This minimizes buffering delays during testing.
b. Serializer/Deserializer (SerDe) Setup
-
SerDe Definition:
Create a custom SerDe (serializer and deserializer) for your data objects (for example, a ride record).
- Serializer: Converts a Java object to a JSON string for Kafka.
- Deserializer: Converts JSON back into a Java object.
-
Configuration:
Use Kafka’s provided
KafkaJsonSerializer
andKafkaJsonDeserializer
along with a configuration property (e.g.,json.value.type
) to specify the target class.
3. Building the Kafka Streams Topology
a. Stream Builder & Source
-
StreamBuilder:
Instantiate a
StreamsBuilder
to define your stream processing topology. -
Source Topic:
Define the source topic (e.g., “rights” or “rides”) where the input messages are published.
- Use a
Consumed
instance with the appropriate key (typically String) and your custom JSON value SerDe.
- Use a
b. Transformation and Aggregation
-
Group By Key:
Group incoming records by key (for example, pickup location ID). This step is crucial because the key determines:
- How messages are partitioned across the cluster.
- Which messages are aggregated together.
- Count Operation: Apply the count aggregation on the grouped stream, resulting in a KTable that maintains a running count of records per key.
- Output Topic: Write the resulting counts to an output topic (e.g., “rights pu location count”) using a producer configuration that matches the key and value formats.
4. The Role of Keys in Kafka Streams
a. Determining Partition Assignment
- Hashing Mechanism: When a producer sends a message, Kafka uses the key’s hash value modulo the number of partitions to determine the partition for that message.
- Consistency: The same key will always be mapped to the same partition, ensuring that all records with that key are processed together.
b. Handling Null Keys
- Round-Robin Assignment: If a key is null, messages are distributed in a round-robin fashion, leading to a more balanced but potentially less coherent grouping if grouping by key is intended.
c. Data Skew Concerns
- Skewed Data: If a particular key (e.g., a very popular pickup location) dominates, it may lead to uneven partition loads.
-
Mitigation Strategies:
- Introduce controlled randomness or key-splitting strategies to distribute heavy-hitter keys across multiple partitions.
- Monitor processing lags and rebalance consumer instances if necessary.
5. Execution and Testing
a. Running the Application
-
Topology Deployment:
Build and start the Kafka Streams topology.
- Use a shutdown hook to gracefully close streams during application termination.
- Producer Simulation: In the example, the producer sends messages with slight delays (using thread sleep) to simulate real-time data flow.
b. Observing Results
- Output Verification: Check the output topic to verify that counts are being updated correctly for each key.
- Multiple Instance Consideration: Understand that in a distributed deployment (multiple stream instances), partitions are assigned to different application instances, but the consistency of key-based grouping is maintained.
6. Advanced Considerations and Best Practices
a. Scaling Out
- Multiple Instances: Deploying your Kafka Streams application in a cluster ensures that partitions are evenly distributed across instances. Make sure your keys are chosen to support this distribution.
- Stateless vs. Stateful Operations: Recognize that operations like grouping and counting are stateful. Proper state store configuration and replication are crucial for fault tolerance.
b. Performance Optimization
- Caching and Buffering: Adjust cache sizes and buffering settings based on throughput needs versus latency sensitivity.
- Monitoring: Use Kafka Streams metrics and external tools (e.g., Confluent Control Center, Prometheus, Grafana) to monitor application performance and lag.
c. Data Integrity and Error Handling
- SerDe Accuracy: Ensure that your custom SerDe handles edge cases and errors gracefully.
- Fault Tolerance: Leverage Kafka Streams’ built-in state store changelog topics for recovering state in case of failures.
1. Overview Kafka Streams Joins
Objective:
Learn how to join data from two Kafka topics using Kafka Streams. This technique is essential for combining related streams (for example, matching taxi drop-offs with pickup requests) in real-time applications.
Use Case Example:
- Scenario: In a ride-hailing service, one topic contains ride events (with drop-off location as the key), and another topic holds pickup location events. By joining these streams on their location IDs within a specific time window, you can match drivers with passengers effectively.
2. Core Concepts in Kafka Streams Joins
a. Stream Join Basics
- Join Key Requirement: Kafka Streams joins work only on records with matching keys. In this example, the producer is modified to use the drop-off location ID as the key on the rides topic.
- Types of Joins: While the video focuses on a basic join, Kafka Streams supports various join types (inner join, left join, outer join) based on your use case.
b. Importance of Co-partitioning
- Co-partitioning: Both topics (rides and pickup locations) must have the same number of partitions and an identical partitioning strategy. This ensures that records with the same key are in the same partition across topics, which is a prerequisite for a correct join.
- Why It Matters: Mismatched partitions can lead to keys not aligning, resulting in incomplete or inaccurate join results.
3. Building the Join Topology
a. Setting Up the Streams Environment
-
Configuration:
- Security & Bootstrap: Use the same security protocols and bootstrap server settings as in other Kafka Streams examples.
- Application ID: Define a unique application ID (analogous to a consumer group ID) for the streams application.
- Consumer Settings: Set properties like auto offset reset (commonly “latest”) to control where the stream begins processing.
- Caching: For development, setting the cache size to zero can provide immediate visibility of processed records.
b. Reading and Preparing Input Streams
-
Input Topics:
- Rides Topic: Contains ride events. Modify the producer to set the drop-off location ID as the key.
- Pickup Location Topic: Contains pickup events, where the pickup location ID is available in the message.
- Custom SerDes: Define serializers/deserializers (SerDes) for both streams to handle JSON conversion. This ensures both topics use a consistent format during processing.
c. Key Transformation and Stream Grouping
-
Selecting the Key:
Use the
selectKey
transformation to re-key the pickup location stream so that its key matches the drop-off location ID in the rides stream. - Grouping: Group both streams by the common key (the location ID) before joining.
d. Performing the Join Operation
-
Join Syntax and Logic:
- Value Joiner: Define a lambda (or a dedicated function) that specifies how to merge the values from the two streams. For example, combine ride details with pickup information into a vendor info object.
- Join Window: Set a time window (e.g., 20 minutes) during which events from both streams must occur to be joined. Also, define a grace period (e.g., 5 minutes) to allow for late-arriving records.
- Join Condition Example: Compare timestamps from both events. If the time difference is below a threshold (e.g., 10 minutes), generate a join result; otherwise, skip (or return an empty result).
e. Writing the Joined Output
- Output Stream: After the join, filter out any empty results and map the joined Optional to extract the final vendor info.
- Publishing: Produce the joined results to an output topic (e.g., “vendor information messages”) using appropriate SerDes.
4. Detailed Example Flow
-
Input Preparation:
- Modify the rides topic producer to use the drop-off location ID as the key.
- Produce pickup location events to a separate topic with pickup location IDs.
-
Building the Topology:
- Create a
StreamsBuilder
to read from both topics. -
Read the rides stream:
KStream<String, Ride> ridesStream = builder.stream("rides-topic", Consumed.with(Serdes.String(), rideSerDe));
- Create a
- Read the pickup locations stream and re-key it:
```java
KStream<String, PickupLocation> pickupStream = builder.stream("pickup-location-topic",
Consumed.with(Serdes.String(), pickupSerDe))
.selectKey((key, value) -> value.getPickupLocationId());
```
-
Join Operation:
-
Use the join operator with a value joiner and a defined time window:
KStream<String, VendorInfo> joinedStream = ridesStream.join( pickupStream, (ride, pickup) -> { // Calculate time difference and join if within threshold if (Duration.between(ride.getDropoffTime(), pickup.getPickupTime()).toMinutes() < 10) { return new VendorInfo(ride.getVendorId(), pickup.getPickupLocationId(), pickup.getPickupTime()); } else { return null; } }, JoinWindows.of(Duration.ofMinutes(20)).grace(Duration.ofMinutes(5)), Joined.with(Serdes.String(), rideSerDe, pickupSerDe) );
-
- Filter out null (or empty Optional) join results and send valid records to the output topic:
```java
joinedStream.filter((key, value) -> value != null)
.to("vendor-info-topic", Produced.with(Serdes.String(), vendorInfoSerDe));
```
-
Running and Monitoring:
- Deploy the Kafka Streams application and monitor the internal join topics as well as the output topic.
- Ensure that both topics used for the join are co-partitioned.
5. Best Practices and Supplementary Information
a. Co-partitioning & Partition Strategy
- Ensure Matching Partitions: Always create both input topics with the same number of partitions. This alignment is critical for join correctness.
- Avoid Data Skew: If a key (e.g., a popular location) becomes a hotspot, consider strategies such as key randomization or additional sharding to distribute load evenly.
b. Time Windows and Grace Periods
- Join Windows: Carefully choose the time window based on your application’s latency and business logic. Too narrow a window might miss valid joins; too wide may join unrelated events.
- Grace Period: Allow a grace period to account for network delays and out-of-order events, ensuring that slightly late records can still be joined.
c. Robust Error Handling and Testing
-
Use Optionals:
Returning an
Optional
from your value joiner can help prevent null pointer exceptions and force consumers to check join validity. - Testing with TopologyTestDriver: Leverage Kafka Streams’ testing utilities (e.g., TopologyTestDriver) to simulate stream processing, validate join logic, and catch edge cases early.
d. Monitoring and Metrics
- Internal Topics: Kafka Streams automatically creates internal topics for state stores and joins. Monitor these for performance and correctness.
- Metrics and Logging: Utilize Kafka’s metrics (through JMX, Prometheus, or Confluent Control Center) to track join performance and lag.
1. Overview Kafka Streams Testing
Objective:
Understand how to effectively test Kafka Streams topologies using unit tests. This involves verifying the logic of your stream processing (e.g., aggregations and joins) by testing the topology in isolation with tools like the TopologyTestDriver.
Key Topics:
- Extracting a topology from your stream application for testing.
- Using the TopologyTestDriver to simulate Kafka behavior.
- Creating input and output topics for testing.
- Asserting expected outcomes for both count and join topologies.
- Best practices in unit testing Kafka Streams.
2. Extracting and Testing the Topology
a. Topology Extraction
-
Design Principle:
Structure your Kafka Streams application so that the topology (the definition of how your streams are built) can be extracted separately from the application startup.
-
Method:
Create a method (e.g.,
createTopology()
) that builds and returns the topology. This makes the topology testable in isolation.
-
Method:
Create a method (e.g.,
- Benefits: Allows you to focus on testing your stream logic without concerning yourself with runtime concerns like stream startup/shutdown.
b. The Role of TopologyTestDriver
-
Purpose:
The
TopologyTestDriver
simulates a Kafka cluster, enabling you to run your topology synchronously and verify its behavior. -
Key Advantages:
- No need for an actual Kafka broker.
- Fast execution of tests.
- Ability to feed input and read output records immediately.
-
Usage:
Instantiate the
TopologyTestDriver
with your extracted topology and a set of configuration properties. Then, createTestInputTopic
andTestOutputTopic
objects for your input and output topics.
3. Setting Up the Test Environment
a. Configuration and Properties
-
Dummy Bootstrap Server:
Since you’re not connecting to a real broker, set a dummy value (e.g.,
"dummy:1234"
). -
Key Configuration Properties:
Include essential properties such as:
StreamsConfig.APPLICATION_ID_CONFIG
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
- Any additional configuration for caching, offset reset, etc.
-
Example:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); // Additional dummy configs as needed...
b. Creating Test Topics
-
TestInputTopic:
Use
testDriver.createInputTopic(...)
to simulate the input stream.- Specify serializers for the key and value.
- Optionally, set a start timestamp and auto-advance duration.
-
TestOutputTopic:
Similarly, create a
TestOutputTopic
to capture output records, providing appropriate deserializers. -
Setup Example:
TestInputTopic<String, Long> inputTopic = testDriver.createInputTopic( "rides-topic", Serdes.String().serializer(), Serdes.Long().serializer()); TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic( "rights-count-topic", Serdes.String().deserializer(), Serdes.Long().deserializer());
4. Writing Unit Tests for Kafka Streams
a. Testing a Count Topology
- Scenario: A topology that aggregates and counts messages per key.
-
Test Steps:
-
Pipe Input:
Feed a message with a given key and value using
pipeInput()
. - Assert Output: Check that the output topic receives the correct key-value pair.
-
Edge Cases:
- Test multiple messages with the same key to verify that the count increments as expected.
- Validate that after processing, the output topic is empty (i.e., all messages have been consumed).
-
Pipe Input:
Feed a message with a given key and value using
-
Example Assertion:
inputTopic.pipeInput("100", someRideValue); assertEquals(1, outputTopic.getQueueSize()); KeyValue<String, Long> result = outputTopic.readKeyValue(); assertEquals("100", result.key); assertEquals(1L, result.value.longValue());
b. Testing a Join Topology
- Scenario: A join operation between two streams (e.g., matching ride events with pickup location events based on a common key).
-
Test Steps:
-
Simulate Input:
Pipe input records into both input topics using their respective
TestInputTopic
instances. -
Join Logic:
Ensure that both records have the same key (by setting the key correctly in your producer or via
selectKey
transformation). - Assert Output: Validate that the join output (e.g., a vendor info object) is produced as expected.
-
Simulate Input:
Pipe input records into both input topics using their respective
-
Handling Time:
If your join logic depends on timestamps (e.g., within a join window), supply timestamps in your
pipeInput()
calls. -
Example Assertion:
// Pipe matching records with the same key and appropriate timestamps. inputTopic1.pipeInput("186", rideValue, Instant.now()); inputTopic2.pipeInput("186", pickupValue, Instant.now().plus(Duration.ofSeconds(30))); // Read the join result from the output topic. VendorInfo joinResult = outputTopic.readValue(); assertNotNull(joinResult);
c. Testing Multiple Scenarios
- One-Message Test: Validate that a single message produces the expected count or join result.
- Multiple-Message Test: Feed multiple messages with the same key to verify incremental aggregation.
- Edge Case Test: Test for empty input (no output) and validate that the topology behaves gracefully.
-
Time-Based Punctuation:
Advance the test driver’s wall-clock time using
testDriver.advanceWallClockTime(Duration)
to trigger any scheduled punctuations (e.g., flushing buffered results).
d. Using Assertions and Cleanup
- Assertions: Use your favorite assertion library (e.g., JUnit assertions or AssertJ) to validate the output.
-
Resource Cleanup:
Always close the
TopologyTestDriver
in an@AfterEach
method to release resources and avoid interference between tests.
5. Best Practices in Kafka Streams Testing
a. Separation of Concerns
- Extract Topology: Separate topology construction from runtime logic so that tests focus solely on processing logic.
-
Unit vs. Integration Tests:
- Use the TopologyTestDriver for fast, isolated unit tests.
- Consider integration tests with an embedded Kafka cluster if you need to test producer/consumer interactions end-to-end.
b. State Store Verification
- Pre-Population and Verification: If your topology uses state stores, pre-populate them before running tests and assert their state after processing. This can verify that stateful operations (like aggregations) are working correctly.
c. Handling Time and Punctuation
- Timestamp Management: When testing time-dependent logic (such as windowed joins or aggregations), supply specific timestamps to records and advance the driver's wall-clock time as needed.
- Punctuation Testing: Explicitly trigger punctuations by advancing time, then assert that scheduled operations produce the expected outputs.
d. Reusability and Maintainability
- Helper Classes: Create helper methods or classes to generate test data (e.g., a data generator for ride events) to reduce duplication.
- Consistent Configurations: Use constants for topic names and configuration properties to avoid mismatches between production code and tests.
1. Overview Kafka Streams Windowing
Objective:
Understand how Kafka Streams implements windowing for time-based aggregations and joins. This includes the concepts behind window types, their impact on stateful processing, and how to configure and use them in real-time stream processing.
Key Topics:
- Global KTables and their role in minimizing data shuffling
- Different join types in Kafka Streams
- Types of windowing functions: tumbling, hopping, sliding, and session windows
- Practical windowing operations using the DSL
2. Global KTables and Joins
a. Global KTables
- Concept: Global KTables broadcast the complete dataset to every node. Unlike standard KTables, which are partitioned across nodes, a global KTable ensures that each instance has full data locally.
-
Benefits:
- Prevents costly shuffling during join operations since every node has a complete copy.
- Ideal for small, relatively static datasets (e.g., postal codes).
-
Considerations:
- Memory limitations: Global KTables are best used for data that doesn’t grow unbounded.
-
Usage:
GlobalKTable<String, SomeValue> globalTable = builder.globalTable("small-dataset-topic");
b. Join Types in Kafka Streams
- Inner Join: Combines records from two streams (or a stream and a table) only if matching keys exist in both.
- Left Join: Returns all records from the left side and the matching records from the right side (or null if no match).
- Outer Join: Available only for stream-to-stream or table-to-table joins, ensuring that all records from both sides are included, with nulls for non-matches.
-
Key Note:
Joins in Kafka Streams require matching keys. For stream joins, ensure the keys are set appropriately (e.g., using
selectKey()
).
3. Windowing Concepts in Kafka Streams
Windowing allows you to group events that occur within a specific time frame for aggregation or join operations.
a. Tumbling Windows
- Definition: Fixed-size, non-overlapping windows. Each record falls into exactly one window.
- Use Case: Counting events per fixed interval (e.g., every 10 seconds).
-
Example:
TimeWindows.of(Duration.ofSeconds(10))
b. Hopping Windows
- Definition: Fixed-size windows that overlap; defined by a window size and a hop interval. A record can belong to multiple windows.
- Use Case: When you want periodic aggregates with overlapping data.
-
Example:
TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(5))
c. Sliding Windows
- Definition: Windows defined by the time difference between records, continuously sliding over time.
- Use Case: When the window boundaries are not fixed but should adjust based on event times.
- Configuration: Sliding windows are similar to hopping windows but are more dynamic, as they allow the window boundaries to be determined by the data.
d. Session Windows
- Definition: Windows based on periods of activity separated by a gap of inactivity (the session gap). They capture bursts of events that are grouped together.
- Use Case: Tracking user sessions where activity is intermittent.
-
Example:
SessionWindows.with(Duration.ofMinutes(5))
Consideration:
Events arriving after the inactivity gap start a new session.
4. Practical Windowing Operations
a. Windowed Aggregations
-
Basic Flow:
- Group by Key: Group incoming events by key.
-
Apply Windowing:
Use
.windowedBy()
with the chosen window type (e.g., tumbling window). - Aggregate: Perform a count or sum within each window.
-
Example Code:
KStream<String, Long> stream = builder.stream("input-topic"); KTable<Windowed<String>, Long> windowedCounts = stream .groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(5))) .count(); // Output to a topic (the key becomes a Windowed<String> type) windowedCounts.toStream().to("output-topic", Produced.with(new WindowedSerdes.TimeWindowedSerde<>(Serdes.String()), Serdes.Long()));
b. Impact on Join Operations
- Windowed Joins: When joining streams with windowing, ensure that the join window (e.g., 10 seconds) captures the expected event time differences.
- Grace Periods: Configure a grace period to allow for late-arriving events. Late events outside this window are dropped.
-
Example:
KStream<String, ViewEvent> views = builder.stream("views-topic"); KStream<String, ClickEvent> clicks = builder.stream("clicks-topic"); KStream<String, JoinedResult> joinedStream = views.join( clicks, (view, click) -> new JoinedResult(view, click), JoinWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(5)), Joined.with(Serdes.String(), viewSerde, clickSerde) );
5. Best Practices and Considerations
- Memory Management: Global KTables should only be used with small datasets to avoid memory overload.
- Window Size and Grace Period: Choose window sizes based on business requirements. Adjust grace periods to balance between late-arriving events and timely output.
- Monitoring and Metrics: Monitor the number of windows created and their durations. Use Kafka Streams metrics and logs to diagnose windowing behavior.
- Data Skew: Consider potential skew in key distribution that might affect windowed joins and aggregations. Adjust partitioning strategy if necessary.
- Testing: Use the TopologyTestDriver to simulate windowed operations and verify window boundaries, aggregates, and join outcomes (see additional testing notes in DE Zoomcamp 6.9 testing modules).
Top comments (0)