Read this tutorial on Signadot, by Muhammad Khabbab.
Imagine you are a developer in an engineering team building microservices that communicate asynchronously via Kafka. Setting up all microservices and Kafka infrastructure locally can be challenging, so integration and end-to-end testing are done in a shared staging environment. This setup, however, often means that one developer’s tests interfere with others’ tests. To avoid this, we need a way to test microservices in isolation; including those that produce and consume messages from Kafka. This tutorial will walk through how Signadot Sandboxes enables this.
Components of the Kafka Demo App
This demo app is designed to showcase how Kafka can facilitate selective message consumption in a microservices architecture. Each component of the app plays a specific role:
Producer: Sends messages to a Kafka topic. We will attach a routing key to each message header to allow sandbox-specific message consumption. Using OpenTelemetry, we’ll propagate the incoming request headers (which include the sandbox routing key) into the Kafka messages, allowing for sandbox-specific message consumption.
Consumer: Receives messages from Kafka. Each sandboxed consumer uses the routing key to decide whether to process or ignore messages, hence the selective consumption of message.
Signadot Operator: Manages the sandbox environment and ensures messages with specific routing keys are routed to the matching sandboxed consumers. The operator’s ‘Routes’ API also helps Kafka consumers selectively consume messages meant only for their designated sandbox.
Frontend: Provides a user interface to send messages and view message logs for both baseline and sandboxed consumer processing.
Request Flow
Baseline Flow (No matching Routing Key):
- The producer sends messages to Kafka without a routing key.
- Since there is no routing key (or a routing key that does not have an association with any of the Sandboxes for this consumer), only the baseline consumer, with its own dedicated consumer group, processes the message.
- The baseline consumer logs the message, while sandbox consumers ignore it due to the absence of a matching routing key.
Sandbox Flow (With Routing Key):
- The producer sends a message with a routing key added to the Kafka message header.
- The sandbox consumer processes the message if its routing key matches, whereas the baseline consumer ignores it due to the unmatched routing key and group ID.
Now that we understand the flow, let’s get our hands dirty with the detailed steps.
Step 1: Deploy the Demo App
This step will deploy the Kafka demo app (producer, consumer, frontend) to set up a baseline Kafka workflow.
1) Clone the Demo App Repository:
$ git clone https://github.com/signadot/examples.git && cd examples/selective-consumption-with-kafka
2) Deploy the Kafka Demo:
$ kubectl create ns kafka-demo
namespace/kafka-demo created
$ kubectl -n kafka-demo apply -f k8s/pieces
serviceaccount/consumer created
deployment.apps/consumer created
serviceaccount/frontend created
deployment.apps/frontend created
service/frontend created
serviceaccount/kafka created
service/kafka-headless created
statefulset.apps/kafka created
serviceaccount/producer created
deployment.apps/producer created
service/producer created
serviceaccount/redis created
deployment.apps/redis created
service/redis created
This will deploy:
- Kafka cluster
- Redis server
- Frontend (A)
- Producer (P)
- Consumer (C)
Let’s check the status of all the pods to ensure everything is up and running.
$ kubectl get pods -n kafka-demo
NAME READY STATUS RESTARTS AGE
consumer-7b94f77c95-vnmx6 1/1 Running 0 77s
frontend-8648dd67c9-kbhmf 2/2 Running 0 77s
kafka-0 1/1 Running 0 77s
producer-c4c79fdd9-sn9jg 2/2 Running 0 77s
redis-594957c647-jhzvh 1/1 Running 0 77s
3) Forward frontend for testing:
$ kubectl port-forward service/frontend -n kafka-demo 4000:4000
This will expose the frontend at localhost:4000 for interacting with the Kafka demo.
Step 2: Test Baseline Behavior Without Signadot Sandboxes
Open http://localhost:4000 in your browser to access the Kafka demo frontend and send a message. As there are no sandboxes created yet, the baseline consumer will process the message, which you can see in the frontend interface in below screenshot.
Step 3: Producer’s Code to Propagate Routing Key
(This step can be automated without modifying the producer code by using OpenTelemetry auto-instrumentation, as described here.)
So what does the producer currently do?
- It obtains the routing key from the incoming HTTP request's baggage header.
- It also includes the routing key in the message headers when sending messages to Kafka.
Let’s walk through the code of producer app.js regarding how it incorporates above two points:
// Extract the routing key from the 'baggage' header
let routingKey = extractRoutingKey(req.get('baggage'));
Then make sure the routing key is included when publishing the message to Kafka.
// Publish the message to Kafka with the routing key included in the headers
publishMessage(kafkaTopic, msg, { baggage: req.get('baggage') })
Although the producer includes the routing key, selective consumption based on this key is primarily handled by the consumer.. So it is consumer’s turn to factor in the presence of routing key and then decide whether to consume the message or not. Let’s make some changes to the consumer code.
Step 4: Modify the Consumer for Selective Message Consumption
What exactly will be managed by the consumer?:
- Create a New Consumer Group for Sandboxes: That will make sure that sandboxed consumers use a unique consumer group to prevent interference with the baseline consumer's offsets. Creating separate consumer groups isolates message consumption between baseline and sandboxed consumers. Note that we are following the standard Kafka messaging approach of “consumer group” mode and not the “stand-alone” mode.
- Manage Offsets Appropriately: Configure the consumer to start consuming messages from the latest offset, so sandboxed consumers focus on new messages instead of processing old messages.
- Discard Messages Not Meant for the Sandbox: Revise the consumer code to use the Signadot ‘Routes’ API, which maps routing keys to sandbox names, to ignore irrelevant messages. Platform teams often encapsulate this logic in a custom Kafka client library that engineering teams can easily use for selective message consumption..
- Propagate the Routing Key to Maintain Context: Just in case, If the consumer communicates with other services via synchronous APIs or asynchronously via Kafka messages, we need to ensure that the routing key is also included in the outgoing message/request headers.
Let’s go through important sections of the code one by one.
Connecting to the Routes API:
// Start fetching routing keys from the Signadot Routes API
run();
function run() {
// initial load of routes
getRoutes();
// reload routes every 5 seconds
setInterval(getRoutes, 5 * 1000);
}
The run method in the consumer code triggers periodic calls to Signadot’s Routes API to retrieve routing keys. This enables the consumer to determine if a message is intended for its specific sandbox. Every 5 seconds, the getRoutes method sends an HTTP request to the Routes API (routeServerURL) to fetch the latest routing keys.
Within getRoutes, the response is parsed to extract routing keys, which are then cached. These cached keys are referenced by the shouldProcess function to decide if a message should be processed based on its routing key. This setup ensures that the consumer is continuously updated with the latest routing information. As a result, consumer is able to isolate messages dynamically for each sandbox environment.
Creating Consumer Group on the Fly:
// Assign a unique consumer group ID based on the sandbox name
const groupId = process.env.SIGNADOT_SANDBOX_NAME
? `sandbox-consumer-${process.env.SIGNADOT_SANDBOX_NAME}`
: 'baseline-consumer';
In above code, consumer is making use of SIGNADOT_SANDBOX environment variable to create a unique group ID for sandboxed consumers. By using different consumer group IDs, the sandboxed consumers have their own offsets and do not interfere with the baseline consumer. Note that it is the Signadot Operator that automatically injects the SIGNADOT_SANDBOX_NAME environment variable into the pods running in a sandbox.
In Sandbox:
- The SIGNADOT_SANDBOX_NAME environment variable is set to the name of the sandbox.
- The consumer group ID becomes sandbox-consumer-[SANDBOX_NAME].
In Baseline:
- When running in the baseline environment, SIGNADOT_SANDBOX_NAME is not set or is empty.
- The consumer group ID defaults to 'baseline-consumer'.
Consuming Messages with the Specified Group ID:
// Consume messages from the Kafka topic with the specified groupId
consumeMessages(kafkaTopic, groupId, (msg, headers) => {
// ...
});
This will pass the groupId to the consumeMessages (Defined in kafka.js) function to ensure the consumer uses the correct consumer group.
Extracting Headers from Kafka Messages:
// Extract the 'baggage' header from the message headers
let baggage = headers['baggage'] ? headers['baggage'].toString() : '';
// Extract the routing key from the 'baggage' header
let routingKey = extractRoutingKey(baggage);
Above will retrieve the routing key from the message headers. This will determine whether the message is relevant to this consumer or not.
Deciding Whether to Process the Message:
The shouldProcess() function plays a crucial role in determining if this consumer should process a message based on its routing key. This function is part of the new code written in the custom kafka client wrapper, which matches the message routing key with the sandbox name as returned from the ‘Routes’ API.
// Determine if the message should be processed based on routing keys
if (!shouldProcess(routingKey)) {
// Skip the message if it doesn't match
return;
function shouldProcess(routingKey) {
const routingKeys = cache.get('routingKeys');
if (sandboxName !== "") {
// we are a sandboxed workload, only accept the received routing keys
return routingKeys.has(routingKey)
}
// we are a baseline workload, ignore received routing keys (they belong
// to sandboxed workloads)
return !routingKeys.has(routingKey)
}
The shouldProcess function determines if a message should be processed based on the presence of a routing key and whether the workload is a sandbox or a baseline. It retrieves a set of valid routingKeys from the cache. If the workload is sandboxed (indicated by sandboxName not being empty), it processes only messages with a routing key that matches the sandbox’s routing key set. For baseline workloads, it processes messages without a routing key, ignoring any messages designated for sandboxed workloads. This ensures that sandboxed messages are isolated from baseline processing.
Now that the consumer behavior is updated, the last thing to do is update the Kafka implementation to handle groupId and manage offsets appropriately by starting from the latest message. Let’s analyze the changes in Kakfa implementation.
Step 5: Modify Kafka Consumer Implementation for Offset Management
What will be changed in Kafka?
- Accept Consumer Group ID as a Parameter: Modify the consumeMessages function to accept a groupId parameter, allowing the consumer to specify the consumer group ID.
- Manage Offsets by Starting from the Latest Message: Configure the Kafka consumer to begin consuming messages from the latest offset when a new consumer group is created.
Let’s go through some of the core sections of kafka.js:
Modify the consumeMessages Function to Accept groupId:
// Function to process messages received from Kafka
const consumeMessages = async (topic, groupId, onNewMessage) => {
// ...
};
Consumer Creation having groupID:
// Create a Kafka consumer with the specified groupId
const consumer = kafka.consumer({ groupId: groupId });
Manage Offsets by Starting from the Latest Message:
// Subscribe to a Kafka topic, starting from the latest offset
await consumer.subscribe({ topic: topic, fromBeginning: false });
Step 6: Create Sandboxes
Let’s create the sandboxes for consumer and producer. Note that there is no change needed in the configuration YAML files for either producer or consumer. We will just use the standard Signadot commands to create these sandboxes.
Create Producer Sandbox
$ signadot sandbox apply -f producer.yaml --set cluster=testcluster
Created sandbox "producer-sbx" (routing key: l66wfqg7yuyuy) in cluster "testcluster".
Waiting (up to --wait-timeout=3m0s) for sandbox to be ready...
✓ Sandbox status: Ready: All desired workloads are available.
Dashboard page: https://app.signadot.com/sandbox/id/l66wfqg7yuyuy
SANDBOX ENDPOINT TYPE URL
frontend host https://frontend--producer-sbx.preview.signadot.com
The sandbox "producer-sbx" was applied and is ready.
Create Consumer Sandbox
$ signadot sandbox apply -f consumer.yaml --set cluster=testcluster
Created sandbox "consumer-sbx" (routing key: r2ht3px88nx5q) in cluster "testcluster".
Waiting (up to --wait-timeout=3m0s) for sandbox to be ready...
✓ Sandbox status: Ready: All desired workloads are available.
Dashboard page: https://app.signadot.com/sandbox/id/r2ht3px88nx5q
SANDBOX ENDPOINT TYPE URL
frontend host https://frontend--consumer-sbx.preview.signadot.com
The sandbox "consumer-sbx" was applied and is ready.
The sandboxes are created but the pods for consumer and producer are still running with old code. Let’s update the deployment.
Step 7: Test Sandbox Behavior with Routing Key
Let’s verify that sandboxed consumers process messages with routing keys while the baseline consumer ignores them.
- Use the Signadot Browser Extension to set the sandbox consumer.
- Open http://localhost:4000 in your browser.
- Send a message through the frontend UI.
Scenario 1 - Baseline Producer and Baseline Consumer
Scenario 2 - Baseline Producer and Sandbox Consumer
Select the consumer sandbox after enabling Signadot Chrome extension.
Send a message to see whether the baseline consumer or the sandbox consumer consumes it.
As you can see, the sandbox consumer consumes this message. The separate consumer group ID in the sandboxed consumer ensures that messages intended for the sandbox do not interfere with or get processed by the baseline consumer, even if routing keys are used. As you can see, the sandboxed consumer processes the message since the routing key matches, while the baseline consumer ignores it due to its empty routing key set. If you add logs to display the consumer group ID, they will appear in the consumer pod’s logs, confirming our implementation. If you stop your consumer pod and send a message through the demo app, these messages will be ignored, and the consumer will only consume those messages that were sent after it resumed running.
Scenario 3 - Sandbox Producer and Baseline Consumer
This time, we will select “Sandbox Producer” when enabling Signadot Chrome extension.
Now let’s send the message and see the results.
So even though you can see there is a routing key, but still the baseline consumer picked this message. This is because the routing key sent by the producer is unmatched. The baseline consumer is programmed to process any message not specifically routed to a sandbox. In cases where a routing key is present but doesn’t match any active sandbox, the baseline consumer treats it as general traffic and processes it by default.
Conclusion
This tutorial has demonstrated how to implement selective message consumption in a Kafka-based application, which enables isolated feature testing within a shared environment. By leveraging consumer groups, routing keys, and Signadot’s sandboxed environments, we were able to separate test traffic from production flows in a controlled manner.
As systems scale, Signadot’s sandboxing capabilities present an efficient way for teams to test feature-specific traffic within complex architectures, helping avoid the overhead associated with duplicating environments. For development teams seeking enhanced testing precision and speed without increased infrastructure costs, Signadot provides a powerful and scalable solution.
Top comments (0)