Building a Resilient Event-Driven Architecture with Apache Kafka and Stream Processing
Building a Resilient Event-Driven Architecture with Apache Kafka and Stream Processing
In modern distributed systems, the ability to process data in real time while maintaining system resilience is critical. An event-driven architecture (EDA) decouples services through asynchronous messaging, allowing systems to scale independently and recover gracefully from failures. This guide walks you through designing and implementing a production-ready event-driven system using Apache Kafka for messaging and Apache Flink for stream processing.
Why Event-Driven Architecture?
Traditional request-response architectures tightly couple services, creating brittle systems where failures cascade. Event-driven architecture solves this by:
- Decoupling: Services communicate through events, not direct calls
- Scalability: Independent scaling of producers and consumers
- Resilience: Failures don't cascade; events persist in brokers
- Replayability: Historical events can be reprocessed for recovery or new features
System Overview
We'll build a real-time order processing system with these components:
Order Service → Kafka → Stream Processor → Kafka → Notification Service + Analytics DB
Architecture Diagram
┌─────────────┐ events ┌──────────┐ processed ┌──────────────┐
│ Order │ ────────────►│ Kafka │ ─────────────►│ Flink │
│ Service │ │ Cluster │ │ Stream │
└─────────────┘ └──────────┘ │ Processor │
└──────┬───────┘
│
┌──────────────────────────────────────────┘
│
┌──────────▼──────────┐ ┌──────────────────┐
│ Notification │ │ Analytics │
│ Service │ │ Database │
└─────────────────────┘ └──────────────────┘
Step 1: Setting Up Apache Kafka
Installing Kafka Locally
Download and extract Kafka:
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
Start Zookeeper (required for Kafka 3.x):
bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka broker in a new terminal:
bin/kafka-server-start.sh config/server.properties
Creating Topics
Create three topics for our pipeline:
### Raw order events
bin/kafka-topics.sh create \
bootstrap-server localhost:9092 \
topic orders_raw \
partitions 3 \
replication-factor 1 \
config retention.ms=604800000
### Processed orders
bin/kafka-topics.sh create \
bootstrap-server localhost:9092 \
topic orders_processed \
partitions 3 \
replication-factor 1
### Dead letter queue for failed processing
bin/kafka-topics.sh create \
bootstrap-server localhost:9092 \
topic orders_dlq \
partitions 1 \
replication-factor 1
Key topic configurations:
- Partitions: Determine parallelism (3 allows 3 concurrent consumers)
- Retention: 7 days for raw orders allows replay
- Replication-factor: 1 for local dev; use 3+ in production
Step 2: Building the Order Producer Service
Create a Node.js service that publishes order events:
// order-service/index.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function publishOrderEvent(order) {
const event = {
event_id: crypto.randomUUID(),
event_type: 'ORDER_CREATED',
event_timestamp: new Date().toISOString(),
data: {
order_id: order.id,
user_id: order.userId,
items: order.items,
total_amount: order.total,
currency: order.currency
}
};
await producer.send({
topic: 'orders_raw',
messages: [{
key: order.userId,
value: JSON.stringify(event)
}]
});
console.log(`Order ${order.id} published to Kafka`);
}
async function start() {
await producer.connect();
// Simulate order creation
setInterval(() => {
const order = {
id: `order_${Date.now()}`,
userId: `user_${Math.floor(Math.random() * 100)}`,
items: [{ product: 'laptop', quantity: 1, price: 999.99 }],
total: 999.99,
currency: 'USD'
};
publishOrderEvent(order);
}, 2000);
}
start().catch(console.error);
Install dependencies:
npm init -y
npm install kafkajs crypto
Key Producer Patterns
Partitioning by key: Using userId ensures all orders from one user go to the same partition, maintaining order .
Error handling: Always implement retry logic with exponential backoff:
async function publishWithRetry(order, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await publishOrderEvent(order);
return;
} catch (error) {
if (i === maxRetries - 1) throw error;
await new Promise(r => setTimeout(r, Math.pow(2, i) * 1000));
}
}
}
Step 3: Stream Processing with Apache Flink
Setting Up Flink
Download Flink:
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -xzf flink-1.17.1-bin-scala_2.12.tgz
cd flink-1.17.1
./bin/start-cluster.sh
Access the Flink dashboard at http://localhost:8081.
Creating the Stream Processor (Java)
// OrderProcessor.java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class OrderProcessor {
public static class OrderEvent {
public String event_id;
public String event_type;
public String event_timestamp;
public OrderData data;
}
public static class OrderData {
public String order_id;
public String user_id;
public Item[] items;
public double total_amount;
public String currency;
}
public static class ProcessedOrder {
public String order_id;
public String user_id;
public double total_amount;
public String status;
public String processed_at;
public boolean high_value;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// Kafka source configuration
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("orders_raw")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> rawStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process orders: validate, enrich, transform
DataStream<ProcessedOrder> processedStream = rawStream
.map((MapFunction<String, ProcessedOrder>) json -> {
// Parse JSON (use Jackson/Gson in production)
OrderEvent event = parseJson(json, OrderEvent.class);
ProcessedOrder order = new ProcessedOrder();
order.order_id = event.data.order_id;
order.user_id = event.data.user_id;
order.total_amount = event.data.total_amount;
order.status = "PROCESSED";
order.processed_at = java.time.Instant.now().toString();
order.high_value = event.data.total_amount > 500;
return order;
})
.name("Order Transformation");
// Sink to processed orders topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer((record, value) -> {
record.value(value);
record.key("default");
})
.build();
processedStream.sinkTo(sink)
.name("Kafka Sink")
.setParallelism(3);
env.execute("Order Processing Pipeline");
}
private static <T> T parseJson(String json, Class<T> clazz) {
// Implementation using Jackson ObjectMapper
// Returns parsed object
return null;
}
}
Building and Deploying
### Build JAR with Maven
mvn clean package
### Submit to Flink cluster
./bin/flink run -c OrderProcessor target/order-processor-1.0.jar
Stream Processing Patterns Implemented
Stateful processing: Flink maintains state for aggregations and windowing
Exactly-once semantics: Kafka + Flink combination guarantees each event processed once
Error handling with DLQ: Route failed records to dead letter queue:
DataStream<String> errorStream = processedStream
.filter(order -> order == null)
.map(error -> "{\"error\": \"processing_failed\", \"original\": " + originalJson + "}")
.sinkTo(dlqSink);
Step 4: Consumer Services
Notification Service (Python)
### notification-service/main.py
from confluent_kafka import Consumer, KafkaException
import json
import requests
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'notification-service',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['orders_processed'])
def send_notification(order):
"""Send email/SMS notification"""
if order['high_value']:
# Send priority notification
requests.post('https://api.notifications.com/priority', json={
'user_id': order['user_id'],
'order_id': order['order_id'],
'amount': order['total_amount'],
'channel': ['email', 'sms']
})
else:
# Standard notification
requests.post('https://api.notifications.com/standard', json={
'user_id': order['user_id'],
'order_id': order['order_id']
})
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
order = json.loads(msg.value().decode('utf-8'))
send_notification(order)
print(f"Notification sent for order {order['order_id']}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Install dependencies:
pip install confluent-kafka requests
Consumer Group Best Practices
- Group ID: Each service type uses unique group ID for independent consumption
- Offset management: Auto-commit enabled; disable for manual control in critical paths
- Parallelism: Number of consumers ≤ number of partitions
Step 5: Monitoring and Observability
Kafka Metrics dashboard
Track these critical metrics:
### monitoring/kafka_metrics.py
import requests
import time
KAFKA_JMX_PORT = 9999
def get_consumer_lag(topic, group_id):
"""Get lag for consumer group"""
# Use Kafka REST proxy or JMX
lag = calculate_lag(topic, group_id)
return lag
def monitor_pipeline():
while True:
lag = get_consumer_lag('orders_processed', 'notification-service')
if lag > 1000:
alert(f"High consumer lag: {lag}")
time.sleep(30)
Flink Monitoring
Access Flink web UI at http://localhost:8081 to monitor:
- Checkpoint status and duration
- Backpressure indicators
- Task throughput and latency
Distributed Tracing
Integrate OpenTelemetry for end-to-end tracing:
// Add to order-service
const { NodeTracerProvider } = require('@opentelemetry/node');
const { SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const provider = new NodeTracerProvider();
const exporter = new JaegerExporter({
endpoint: 'http://localhost:14268/api/traces'
});
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
provider.register();
Step 6: Production Deployment Checklist
Infrastructure Requirements
| Component | Minimum | Production |
|-|-||
| Kafka brokers | 1 | 3+ (odd number) |
| Zookeeper nodes | 1 | 3+ |
| Flink TaskManagers | 1 | 5+ with HA |
| Replication factor | 1 | 3 |
| Partitions per topic | 1 | 12+ (scale based on throughput) |
Key Production Configurations
Kafka broker configuration (server.properties):
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
log.retention.hours=168
auto.create.topics.enable=false
Flink high-availability:
### flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: s3://flink-ha/
state.backend: rocksdb
state.checkpoints.dir: s3://flink-checkpoints/
Security Hardening
- Enable SSL/TLS for all broker communication
- SASL authentication (SCRAM or OAuth2)
- Authorization with ACLs per topic/group
- Network segmentation - isolate broker network
### Create ACL for consumer group
bin/kafka-acls.sh authorizer-properties zookeeper.connect=localhost:2181 \
add allow-principal User:notification-service \
operation Read topic orders_processed \
group notification-service
Step 7: Testing Strategies
Local Testing with TestContainers
// OrderProcessorTest.java
@Testcontainers
public class OrderProcessorTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.4.0")
);
@Test
public void testOrderProcessing() throws Exception {
// Produce test event
produceTestEvent();
// Verify processed output
ProcessedOrder result = consumeProcessedEvent();
assertThat(result.getStatus()).isEqualTo("PROCESSED");
assertThat(result.isHighValue()).isTrue();
}
}
Chaos Testing
Simulate failures to validate resilience:
### Kill a Kafka broker
docker stop kafka-broker-2
### Verify system continues processing
### Checkpoint recovery should occur automatically
Common Pitfalls and Solutions
Pitfall 1: Partition Key Misconfiguration
Problem: All events go to one partition, limiting parallelism
Solution: Choose partition key based on cardinality needs:
// BAD: All events to same partition
{ key: 'constant', value: event }
// GOOD: User-based partitioning maintains order per user
{ key: userId, value: event }
// GOOD: Round-robin for unordered events
{ key: null, value: event }
Pitfall 2: Ignoring Backpressure
Problem: Consumer can't keep up, memory explodes
Solution: Implement backpressure handling:
// Flink automatically handles backpressure
// But monitor and scale consumers accordingly
env.setParallelism(calculateRequiredParallelism());
Pitfall 3: No Dead Letter Queue
Problem: Poison messages block entire partition
Solution: Always implement DLQ pattern:
try:
process(message)
except Exception as e:
produce_to_dlq(message, str(e))
# Don't re-raise; acknowledge message
Performance Optimization Tips
Kafka Tuning
### Producer tuning
batch.size=32768
linger.ms=20
compression.type=lz4
### Consumer tuning
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=1000
Flink Tuning
// Enable incremental checkpoints
env.enableCheckpointing(60000); // 1 minute
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Configure state backend
env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/backups"));
Next Steps and Extensions
Add Real-time Analytics
Extend the pipeline with windowed aggregations:
// 5-minute rolling window of order totals
processedStream
.keyBy(order -> order.user_id)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.sum("total_amount")
.sinkTo(analyticsSink);
Implement Event Sourcing
Store all state changes as events for complete audit trail and time-travel debugging.
Add Schema Registry
Use Confluent Schema Registry for schema evolution and compatibility checking:
const { KafkaSchemaRegistry } = require('@ konveyor/schema-registry');
const schemaRegistry = new KafkaSchemaRegistry({
url: 'http://localhost:8081'
});
await schemaRegistry.register('orders_raw', orderSchema);
Conclusion
Building a resilient event-driven architecture requires careful attention to messaging patterns, state management, and operational excellence. By combining Apache Kafka's durable messaging with Flink's powerful stream processing, you create systems that are:
- Scalable: Handle millions of events per second
- Resilient: Survive component failures without data loss
- Maintainable: Clear separation of concerns through event contracts
- Observable: Complete visibility into data flow
Start small with a single pipeline, establish monitoring and testing practices, then gradually expand to more complex event-driven patterns. The investment in proper event-driven architecture pays dividends in system reliability and team velocity.
Key takeaways:
- Always design for failure-implement DLQs and retry patterns
- Monitor consumer lag and backpressure continuously
- Use partition keys wisely to balance order and parallelism
- Test chaos scenarios regularly to validate resilience
- Plan for schema evolution from day one
Ready to implement? Start by setting up local Kafka and Flink, then build your first producer-consumer pipeline following the patterns in this guide.
Need help implementing event-driven architecture for your specific use case? Share your challenges and I'll provide targeted guidance.
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)