In this article, we'll explore how to implement a request-response pattern in Camunda Platform 8.5 using Kafka connectors.
The goal is to design a workflow where:
- A Kafka producer sends a request message to a Kafka topic.
- Multiple Kafka consumers wait for response messages on a different topic, correlating responses with requests using a unique
requestId
. - The workflow handles concurrent consumers efficiently, ensuring scalability and reliability.
- A Gateway evaluates the response
status
to determine the next steps in the workflow.
This setup is ideal for asynchronous interactions with external systems, where multiple instances may be processing requests concurrently.
Prerequisites
- Camunda Platform 8.5 installed and configured.
- Access to a Kafka cluster.
- Camunda's Kafka Connector installed.
Implementation Details
1. Publishing Requests with Kafka Producer Connector
The first step is sending request messages to a Kafka topic. Each message includes a unique requestId
to correlate responses.
Configuration:
- Connector Type: Kafka Producer Connector.
-
Topic:
request-topic
. -
Message Key: Use a unique
requestId
. -
Payload: Include
requestId
and any additional data required.
Example BPMN Model Element:
- Service Task: Configure with the Kafka Producer Connector.
Implementation Steps:
Generate a Unique Request ID: Before the Service Task, generate a
requestId
(e.g., using a script task or a UUID generator).Configure the Kafka Producer Task:
Topic: Set to
request-topic
.Key: Set to
${requestId}
to ensure messages can be partitioned or keyed appropriately.-
Payload: Create a JSON payload including the
requestId
and other necessary data.Example Payload:
{ "requestId": "${requestId}", "data": { "customerId": "C12345", "orderAmount": 250.00 } }
- Ensure Message Serialization: Configure the connector to serialize the message payload as JSON.
2. Consuming Responses with Kafka Consumer Connector
Next, the workflow needs to wait for matching response messages while handling concurrent consumers efficiently.
Configuration:
- Connector Type: Kafka Consumer Connector.
-
Topic:
response-topic
. - Consumer Group ID: Set to allow concurrent consumers to distribute the load.
-
Correlation: Use the
requestId
to correlate responses to the correct workflow instance. -
Activation Condition: Use an expression to filter messages based on the
requestId
.
Implementation Steps:
Intermediate Message Catch Event: Use this event in the BPMN model to pause the workflow until a response is received.
Configure the Kafka Consumer Task:
Topic: Set to
response-topic
.Consumer Group ID: Specify a group ID (e.g.,
camunda-consumers
) that allows multiple instances to consume messages in a load-balanced fashion.-
Activation Condition: Use a FEEL expression to ensure the consumer picks up messages where
requestId
matches the process variable.Example Activation Condition:
= kafkaMessage.requestId = requestId
- Handle Concurrent Consumers:
Enable Topic Partitioning: Ensure the Kafka topic is partitioned, and consumers are part of the same consumer group to allow concurrent consumption.
Configure Client IDs: If necessary, configure the connector to use unique client IDs for concurrent consumers.
- Ensure Idempotency: Design your workflow to handle any potential duplicates or out-of-order messages.
3. Evaluating Response Status
After receiving the response, the workflow evaluates the status
field to determine the next steps.
Implementation Steps:
Exclusive Gateway: Add an Exclusive Gateway after the Kafka Consumer task.
Define Gateway Conditions:
-
Success Path:
= kafkaMessage.status = "success"
-
Failure Path:
= kafkaMessage.status != "success"
- Proceed Accordingly:
- If Success: Continue to the next steps in the workflow.
- If Failure: Trigger compensating actions or error handling.
4. Handling Concurrent Consumers
To effectively handle concurrent consumers:
Use Consumer Groups: Ensure all Kafka consumers are part of the same consumer group. Kafka will distribute partitions among consumers in the same group, allowing for parallel processing.
Scale Consumers: Deploy multiple instances of the workflow engine or configure multiple replicas to scale out consumer capacity.
Monitor Lag and Throughput: Use Kafka monitoring tools to keep an eye on consumer lag and adjust resources as necessary.
Error Handling and Dead Letter Queues: Implement error handling strategies for failed message processing, such as retries or dead-letter topics.
5. Additional Enhancements
Use Headers for Correlation: If supported, utilize Kafka message headers to store
requestId
for more efficient filtering.Timeout Mechanisms: Implement a timer boundary event to handle cases where a response is not received within a reasonable time frame.
Transactional Messaging: Consider using Kafka's transactional features to ensure exactly-once processing semantics.
Sample BPMN Diagram Elements
Below is a high-level description of the BPMN elements:
Start Event: Begins the process.
Generate Request ID (Script Task): Generates a unique
requestId
.Kafka Producer Task (Service Task): Sends the request message to
request-topic
.Intermediate Message Catch Event: Configured with the Kafka Consumer Connector to wait for the response.
Exclusive Gateway: Evaluates the
status
field in the response.Success Path: Continues the process for successful responses.
Failure Path: Handles errors for unsuccessful responses.
End Event: Terminates the process.
Example BPMN XML Snippet
<bpmn:process id="KafkaRequestResponseProcess" isExecutable="true" xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0">
<!-- Start Event -->
<bpmn:startEvent id="StartEvent" />
<!-- Generate Request ID -->
<bpmn:scriptTask id="GenerateRequestId" name="Generate Request ID" scriptFormat="javascript">
<bpmn:incoming>StartEvent</bpmn:incoming>
<bpmn:script>
<![CDATA[
var javaUUID = Java.type('java.util.UUID');
var requestId = javaUUID.randomUUID().toString();
execution.setVariable('requestId', requestId);
]]>
</bpmn:script>
</bpmn:scriptTask>
<!-- Send Request -->
<bpmn:serviceTask id="SendRequest" name="Send Request">
<bpmn:extensionElements>
<zeebe:taskDefinition type="kafka:publish" />
<zeebe:taskHeaders>
<zeebe:header key="topic" value="request-topic" />
<zeebe:header key="key" value="= requestId" />
<zeebe:header key="message" value="={{ 'requestId': requestId, 'data': { 'customerId': 'C12345', 'orderAmount': 250.00 } }}" />
</zeebe:taskHeaders>
</bpmn:extensionElements>
</bpmn:serviceTask>
<!-- Wait for Response -->
<bpmn:intermediateCatchEvent id="WaitForResponse" name="Wait for Response">
<bpmn:extensionElements>
<zeebe:taskDefinition type="kafka:subscribe" />
<zeebe:taskHeaders>
<zeebe:header key="topic" value="response-topic" />
<zeebe:header key="groupId" value="camunda-consumers" />
<zeebe:header key="activationCondition" value="= kafkaMessage.requestId = requestId" />
</zeebe:taskHeaders>
</bpmn:extensionElements>
</bpmn:intermediateCatchEvent>
<!-- Evaluate Response -->
<bpmn:exclusiveGateway id="EvaluateResponse" name="Evaluate Response" />
<!-- Process Success -->
<bpmn:serviceTask id="ProcessSuccess" name="Process Success">
<bpmn:extensionElements>
<!-- Additional processing tasks -->
</bpmn:extensionElements>
</bpmn:serviceTask>
<!-- Handle Error -->
<bpmn:serviceTask id="HandleError" name="Handle Error">
<bpmn:extensionElements>
<!-- Error handling tasks -->
</bpmn:extensionElements>
</bpmn:serviceTask>
<!-- End Event -->
<bpmn:endEvent id="EndEvent" />
<!-- Sequence Flows -->
<bpmn:sequenceFlow id="Flow1" sourceRef="StartEvent" targetRef="GenerateRequestId" />
<bpmn:sequenceFlow id="Flow2" sourceRef="GenerateRequestId" targetRef="SendRequest" />
<bpmn:sequenceFlow id="Flow3" sourceRef="SendRequest" targetRef="WaitForResponse" />
<bpmn:sequenceFlow id="Flow4" sourceRef="WaitForResponse" targetRef="EvaluateResponse" />
<bpmn:sequenceFlow id="Flow5" sourceRef="EvaluateResponse" targetRef="ProcessSuccess">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression"><![CDATA[= kafkaMessage.status = "success" ]]></bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="Flow6" sourceRef="EvaluateResponse" targetRef="HandleError">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression"><![CDATA[= kafkaMessage.status != "success" ]]></bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="Flow7" sourceRef="ProcessSuccess" targetRef="EndEvent" />
<bpmn:sequenceFlow id="Flow8" sourceRef="HandleError" targetRef="EndEvent" />
</bpmn:process>
Note: Ensure that you use the correct namespaces and adjust the XML tags according to Camunda 8.5's BPMN standards.
Benefits of Handling Concurrent Consumers
Scalability: Multiple consumers can process messages in parallel, improving throughput.
Reliability: If a consumer fails, others can continue processing, minimizing downtime.
Load Balancing: Kafka distributes messages among consumers in the same group, balancing the workload.
Testing and Validation
Simulate Multiple Instances: Start multiple instances of the process to test concurrent consumption.
Monitor Consumer Groups: Use Kafka monitoring tools to observe consumer group performance and message lag.
Verify Message Correlation: Ensure that responses are correctly matched to their originating requests using
requestId
.Test Failure Scenarios: Simulate failures to test error handling and recovery mechanisms.
Best Practices
Use Dedicated Consumer Groups: For this workflow, use a specific consumer group to avoid interference with other consumers.
Manage Offsets Properly: Ensure that offsets are committed correctly to avoid message loss or duplication.
Implement Idempotent Processing: Design your workflow to handle duplicate messages gracefully.
Security Considerations: Secure Kafka topics and connections using authentication and encryption as needed.
Conclusion
By leveraging Camunda 8.5's Kafka Connector and properly configuring consumers, you can build robust workflows that handle concurrent request-response interactions via Kafka. The key considerations are:
Proper Connector Configuration: Both producer and consumer connectors must be set up correctly, with attention to correlation IDs and activation conditions.
Concurrency Handling: Utilize Kafka's consumer groups and topic partitioning to enable concurrent processing.
Workflow Design: Incorporate BPMN elements like gateways and boundary events to manage different response outcomes and handle timeouts or errors effectively.
This approach ensures your workflows are scalable, reliable, and capable of handling asynchronous communication with external systems, making full use of Camunda 8.5's capabilities.
Top comments (0)