Performance tuning a distributed streaming system is a journey of discovery, experimentation, and learning. This is the story of how I scaled a Flink streaming job from 140K messages/sec to 1 million messages/sec - a 7x improvement through systematic optimization.
Spoiler alert: The bottleneck wasn't where I expected!
🏗️ Real-Time Data Streaming Platform Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ Real-Time Data Streaming Platform │
│ AWS EKS (Kubernetes 1.31) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ PRODUCER │───▶│ PULSAR │───▶│ FLINK │───▶│CLICKHOUSE │ │
│ │ │ │ │ │ │ │ │ │
│ │ IoT Sensors │ │ Message │ │ Stream │ │ Analytics │ │
│ │ AVRO Data │ │ Broker │ │ Processing │ │ Database │ │
│ │ │ │ │ │ │ │ │ │
│ │ 4x c5.4xl │ │ 6x i7i.8xl │ │ 4x c5.4xl │ │6x r6id.4xl│ │
│ │ 250K/sec │ │ Partitions │ │ Parallelism │ │ Real-time │ │
│ │ each node │ │ 64 │ │ 64 │ │ Queries │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └───────────┘ │
│ │
│ Data Flow: │
│ 300-byte AVRO ──▶ Pulsar Topics ──▶ keyBy(device_id) ──▶ ClickHouse │
│ IoT Messages (Persistent) 1-min Windows Analytics │
│ │
│ Performance Target: 1,000,000 messages/sec end-to-end │
└─────────────────────────────────────────────────────────────────────────┘
Pipeline Components:
- Producers: Generate 300-byte AVRO-serialized IoT sensor data
- Pulsar: Distributed message broker with 64 partitions for parallel processing
- Flink: Stream processing engine with 64-way parallelism for aggregations
- ClickHouse: Real-time analytics database for sub-second queries
The Challenge: Initial setup only achieved 140K msg/sec instead of target 1M msg/sec!
🎯 The Starting Point
Initial Setup:
- Goal: Process 1 million messages/sec from Pulsar
- Message Size: 300 bytes (AVRO-serialized sensor data)
- Job: Source → keyBy → Window (1-min) → Aggregate → Sink (ClickHouse)
- Result: Only 140K msg/sec 😱
Something was clearly wrong. Time to dig in.
🔍 Understanding the Flink Job Structure
Before tuning, I needed to understand what I was working with. You can find the complete implementation in the flink-load directory:
// JDBCFlinkConsumer.java - The Pipeline
PulsarSource<SensorRecord> source = PulsarSource.builder()
.setServiceUrl(pulsarUrl)
.setTopics("persistent://public/default/iot-sensor-data")
.setDeserializationSchema(new AvroSensorDataDeserializationSchema())
.build();
DataStream<SensorRecord> sensorStream = env.fromSource(source, ...);
sensorStream
.keyBy(record -> record.device_id) // ← Data shuffle happens here!
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new SensorAggregator())
.addSink(new ClickHouseJDBCSink(clickhouseUrl));
The Operator Chain
Flink optimizes by chaining operators together:
Task 1 (Source Group):
└─ Pulsar Source (I/O-bound)
└─ AVRO Deserialize
└─ keyBy (compute hash, network shuffle)
Task 2 (Window Group):
└─ Window Aggregate (CPU-bound)
└─ ClickHouse Sink (I/O-bound)
Key Insight: The keyBy() causes data shuffling between Task 1 and Task 2. This creates 2 separate task groups that need slots.
Total slots needed = Parallelism × 2 (not parallelism × 4 operators, due to chaining!)
📊 Phase 1: Initial Configuration (140K msg/sec)
Configuration
# FlinkDeployment configuration
parallelism: 8
pulsar_partitions: 8
# Resource allocation
taskmanager:
replicas: 4
resources:
cpu: 2 # 2 vCPUs per TaskManager
memory: 4Gi
# Task slot mapping
taskmanager.numberOfTaskSlots: 2 # 2 slots per vCPU (2:1 ratio)
Calculation:
- 4 TaskManagers × 2 slots = 8 total slots
- Parallelism = 8
- Each slot gets: 2 vCPUs / 2 slots = 1 vCPU per slot
Instance Type: c5.2xlarge (8 vCPU, 16 GB RAM)
Results
# Metrics after 5 minutes
Records In: 140,000 msg/sec
Records Out: 2,300 aggregated records/min
CPU Usage: 85-95% (maxed out!)
Backpressure: HIGH on source operators
Problem Identified: Not enough parallelism to match Pulsar's 8 partitions efficiently.
🚀 Phase 2: Scale Parallelism (480K msg/sec)
The Hypothesis
If 8 parallel instances handle 140K msg/sec, then:
- Per-instance rate: 140K / 8 = 17,500 msg/sec
- For 1M msg/sec: 1,000,000 / 17,500 ≈ 57 instances needed
Let's try 64 for a clean power-of-2.
Configuration Changes
# Increase Pulsar partitions to 64
pulsar_partitions: 64
# Increase Flink parallelism to match
parallelism: 64
# Task slot mapping (2:1 ratio maintained)
taskmanager.numberOfTaskSlots: 2 # Still 2 slots per vCPU
# Calculate required TaskManagers:
# 64 slots needed / 2 slots per TM = 32 TaskManagers
# OR with 8 vCPU machines: 64 slots / 16 slots per machine = 4 machines
taskmanager:
replicas: 16 # Using c5.2xlarge (8 vCPU, 16 slots per machine)
Wait... 16 TaskManagers on c5.2xlarge?
Let me recalculate:
- c5.2xlarge: 8 vCPUs
- Task slots: 2 per vCPU = 16 slots per machine
- Need 64 slots total
- Machines needed: 64 / 16 = 4 machines
# Corrected configuration
taskmanager:
replicas: 4 # 4 × c5.2xlarge machines
resources:
cpu: 8 # Full 8 vCPUs
memory: 16Gi
Results - Phase 2
# Metrics after deployment
Records In: 480,000 msg/sec (3.4x improvement!)
Records Out: 8,000 aggregated records/min
CPU Usage: 65-75% per TaskManager
Backpressure: MEDIUM
Progress: 140K → 480K msg/sec ✅
But still far from 1M! What's the bottleneck now?
🔧 Phase 3: CPU Resource Tuning (600K msg/sec)
The Investigation
Looking at the Flink deployment YAML in the repository:
# flink-job-deployment.yaml - JobManager pod spec
spec:
job:
jarURI: local:///opt/flink/usrlib/flink-consumer-1.0.0.jar
parallelism: 64
upgradeMode: stateless
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
# CPU limit in deployment
Found it! The TaskManager pod resource definition:
resources:
requests:
cpu: "1" # Only 1 CPU requested! 🚨
memory: "4Gi"
limits:
cpu: "2" # And max 2 CPUs
memory: "8Gi"
Problem: Each TaskManager was throttled to 2 CPUs max, but we have 8 vCPU machines!
The Fix
# Updated TaskManager resources
resources:
requests:
cpu: "5" # Request 5 CPUs (was 1)
memory: "8Gi"
limits:
cpu: "8" # Allow up to 8 CPUs (was 2)
memory: "16Gi"
Results - Phase 3
# After CPU increase
Records In: 600,000 msg/sec (1.25x improvement!)
CPU Usage: 75-85% per TaskManager (better utilization)
Backpressure: LOW → MEDIUM
Progress: 480K → 600K msg/sec ✅
Still not 1M. Time for the real detective work!
🔎 Phase 4: The Backlog Experiment (890K msg/sec)
The Eureka Moment
I noticed a huge backlog forming in Pulsar (100M+ messages). So I tried an experiment:
Stop all producers and let Flink catch up.
# Stop producers
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0
# Watch Flink metrics
kubectl exec -n flink-benchmark <jobmanager> -- curl localhost:8081/jobs/<job-id>/metrics
The Shocking Result
# Flink consuming from backlog (no new messages)
Records In: 890,000 msg/sec 😲
# CPU and memory usage
CPU: 85-90% (near max)
Memory: Stable
Network: ~270 MB/sec ingress
Key Insight: Flink could process 890K msg/sec when reading from Pulsar backlog, but only 600K msg/sec with live producers!
Conclusion: Pulsar was the bottleneck! 🎯
🏎️ Phase 5: Upgrade Pulsar Infrastructure (1M msg/sec)
The Pulsar Problem
Previous Setup:
- Instance: i3en.6xlarge (24 vCPU, 192GB RAM, 2x 7.5TB NVMe)
- Bookies: 4 nodes
- Brokers: 4 nodes
Issues:
- Older generation (i3en)
- Only 4 bookies for 1M msg/sec
- Journal and ledger on same NVMe device
The Pulsar Upgrade
# Updated Terraform configuration
pulsar_broker_bookie_config = {
instance_types = ["i7i.8xlarge"] # Upgraded from i3en.6xlarge
desired_size = 6 # Increased from 4
}
i7i.8xlarge Advantages:
- Newer generation (better CPU IPC)
- 32 vCPUs (vs 24 on i3en)
- 2× NVMe devices (3.75TB each)
- Lower per-device latency
NVMe Device Separation (Critical!):
# Device mapping on each i7i.8xlarge bookie
/dev/nvme1n1 → /mnt/bookkeeper/journal # Journal (WAL)
/dev/nvme2n1 → /mnt/bookkeeper/ledgers # Ledgers (Data)
This separation eliminates I/O contention between:
- Journal: Sequential writes (low latency critical)
- Ledgers: Random reads/writes (capacity critical)
Results - Phase 5
# After Pulsar upgrade
Records In: 1,040,000 msg/sec 🎉
CPU (Flink): 80-85% per TaskManager
CPU (Pulsar): 70-75% per Broker/Bookie
Backpressure: NONE to LOW
End-to-end latency: <2 seconds
SUCCESS: 600K → 1,040K msg/sec ✅
🎯 Phase 6: Final Flink Optimization (1.04M msg/sec sustained)
The Last Mile
Even with Pulsar fixed, I wanted to optimize Flink further. The 2:1 slot-to-CPU ratio was still suboptimal for our CPU-heavy aggregation workload.
Configuration Change
# Final Flink configuration
taskmanager:
replicas: 4 # 4 × c5.4xlarge machines
resources:
cpu: 16 # Full 16 vCPUs per TM
memory: 32Gi
# Slot configuration
taskmanager.numberOfTaskSlots: 16 # 1:1 ratio (was 2:1)
New Calculation:
- 4 TaskManagers × 16 slots = 64 total slots
- Each slot gets: 16 vCPUs / 16 slots = 1 vCPU per slot
Instance Upgrade: c5.2xlarge → c5.4xlarge (16 vCPU, 32 GB RAM)
Why 1:1 Ratio Works Better
Our Pipeline Analysis:
Source (I/O) → keyBy → Window (CPU) → Aggregate (CPU) → Sink (I/O)
🟢 🔴 🔴 🟢
CPU-intensive operators: 2/4 (Window + Aggregate)
I/O-bound operators: 2/4 (Source + Sink)
Decision: Since aggregation is CPU-heavy, 1:1 gives each task dedicated CPU.
Final Results
# Sustained performance metrics
Records In: 1,040,000 msg/sec
Records Out: 17,333 aggregated records/min
CPU Usage: 75-80% per TaskManager (optimal)
Memory Usage: 60-70% (plenty of headroom)
Backpressure: NONE
GC Pressure: LOW
Checkpoint Duration: 5-8 seconds
Final Achievement: 1,040,000 messages/sec sustained 🏆
🧪 The Backlog Test - A Critical Technique
Why This Test Matters
The backlog consumption test reveals your true system capacity:
# The test process
1. Run producers at max speed (build backlog)
2. Stop producers completely
3. Measure Flink consumption from backlog
4. Compare: Backlog rate vs Live rate
What it tells you:
- Shows true Flink capacity
- Reveals whether Pulsar or Flink is the bottleneck
In our case:
- Live: 600K msg/sec
- Backlog: 890K msg/sec
- Conclusion: Pulsar was limiting, not Flink!
🔧 Final Architecture
Flink Configuration
# FlinkDeployment - Final configuration
spec:
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "16"
parallelism.default: "64"
state.backend: rocksdb
state.checkpoints.dir: s3://benchmark-high-infra-state/checkpoints
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
jobManager:
resource:
memory: "8Gi"
cpu: 4
replicas: 1
taskManager:
resource:
memory: "32Gi"
cpu: 16 # Full 16 vCPUs
replicas: 4 # 4 × c5.4xlarge machines
Resource Summary:
- 4 TaskManagers on c5.4xlarge (16 vCPU, 32 GB each)
- 64 task slots total (16 per TM)
- 1:1 slot-to-vCPU ratio
- 64 parallelism (matches Pulsar partitions)
Pulsar Configuration
# Pulsar values - Final configuration
bookkeeper:
replicaCount: 6 # Increased from 4
# NVMe device configuration
volumes:
journal:
size: 200Gi
storageClassName: local-nvme # /dev/nvme1n1
ledgers:
size: 1000Gi
storageClassName: local-nvme # /dev/nvme2n1
configData:
journalMaxSizeMB: "2048"
journalSyncData: "false"
journalAdaptiveGroupWrites: "true"
ledgerStorageClass: "org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage"
broker:
replicaCount: 6
nodeSelector:
node-type: broker-bookie
Instance Type: i7i.8xlarge (32 vCPU, 256GB RAM, 2× 3.75TB NVMe)
📊 Performance Comparison Table
| Phase | Config | Throughput | Bottleneck | Fix |
|---|---|---|---|---|
| Phase 1 | 8 parallel, c5.2xlarge, 2:1 | 140K msg/sec | Low parallelism | Increase to 64 |
| Phase 2 | 64 parallel, c5.2xlarge, 2:1 | 480K msg/sec | CPU throttling | Increase CPU limit |
| Phase 3 | 64 parallel, c5.2xlarge, 2:1, 8 CPU | 600K msg/sec | Pulsar capacity | Upgrade Pulsar |
| Backlog | Same config, no producers | 890K msg/sec | Flink needs more CPU | Upgrade to c5.4xlarge |
| Phase 4 | 64 parallel, c5.4xlarge, 1:1 | 1,040K msg/sec | None! | ✅ Success |
💡 Best Practices for Flink at Scale
1. Match Parallelism to Source Partitions
Pulsar Partitions = Flink Parallelism
This ensures:
- Optimal work distribution
- No partition skew
- Maximum throughput
2. Use the Right Slot-to-CPU Ratio
// Analyze your job operators
Source (I/O) → keyBy → Window (CPU) → Aggregate (CPU) → Sink (I/O)
🟢 🔴 🔴 🟢
// Count CPU-bound operators
CPU-bound: 2 operators (window, aggregate)
I/O-bound: 2 operators (source, sink)
// Decision: 1:1 ratio (due to CPU-heavy aggregate)
3. Right-Size Your TaskManager Instances
Options for 64 slots with 1:1 ratio:
| Instance | vCPU | Machines | Cost/mo | Network | Best For |
|---|---|---|---|---|---|
| c5.2xlarge | 8 | 8 | $2,200 | Up to 10 Gbps | Budget |
| c5.4xlarge | 16 | 4 | $2,400 | Up to 10 Gbps | Balanced ✅ |
| c5.9xlarge | 36 | 2 | $2,700 | 10 Gbps | High memory |
| c5.12xlarge | 48 | 2 | $3,600 | 12 Gbps | Max performance |
Our choice: c5.4xlarge - Best balance of cost and manageability
4. Monitor These Metrics
# Critical Flink metrics to watch
# 1. Backpressure (should be LOW)
flink_taskmanager_job_task_backPressureTimeMsPerSecond
# 2. Records per second
rate(flink_taskmanager_job_task_numRecordsInPerSecond[1m])
# 3. Checkpoint duration (should be < 10% of interval)
flink_jobmanager_job_lastCheckpointDuration
# 4. CPU usage (should be 70-85%)
container_cpu_usage_seconds_total{pod=~"flink-taskmanager.*"}
# 5. Memory usage
flink_taskmanager_Status_JVM_Memory_Heap_Used
5. Test with Backlog
Always do the backlog test:
# 1. Build backlog (run producers at full speed)
# 2. Stop producers
# 3. Measure Flink consumption rate
# 4. This is your TRUE Flink capacity
If backlog consumption > live consumption:
→ Upstream system (Pulsar) is the bottleneck
If backlog consumption ≈ live consumption:
→ Flink is the bottleneck
🎯 Key Takeaways
What Worked
✅ Parallelism = Partitions (64 = 64)
✅ 1:1 slot-to-CPU ratio for CPU-bound workloads
✅ Bigger instances (c5.4xlarge) over many small ones
✅ Upgraded Pulsar (i3en → i7i, 4 → 6 nodes)
✅ NVMe device separation (journal vs ledgers)
✅ Disabled journalSyncData in BookKeeper
✅ Backlog testing to identify bottlenecks
What Didn't Work
❌ 2:1 slot-to-CPU ratio (insufficient CPU per task)
❌ Low CPU limits in pod specs (throttling)
❌ Too few Pulsar bookies (4 → needed 6)
❌ Single NVMe device for journal+ledger (I/O contention)
❌ Older instance types (i3en vs i7i)
💰 Final Cost
| Component | Instance | Count | Monthly Cost |
|---|---|---|---|
| Flink JM | c5.4xlarge | 1 | $480 |
| Flink TM | c5.4xlarge | 4 | $1,920 |
| Flink Total | $2,400 | ||
| Pulsar (Broker+Bookie) | i7i.8xlarge | 6 | $12,960 |
| Infrastructure Total | ~$15,360/month |
Cost per message: $0.0000154 per 1M messages
🚀 Scaling Beyond 1M
Want to go higher? Here's the roadmap:
2M msg/sec:
- Pulsar: 8 bookies (i7i.8xlarge)
- Flink: 8 TaskManagers (c5.4xlarge), parallelism 128
- Partitions: 128
- Cost: ~$23K/month
5M msg/sec:
- Pulsar: 15 bookies (i7i.8xlarge)
- Flink: 16 TaskManagers (c5.4xlarge), parallelism 256
- Partitions: 256
- Cost: ~$40K/month
10M msg/sec:
- Pulsar: 30 bookies (i7i.16xlarge)
- Flink: 32 TaskManagers (c5.9xlarge), parallelism 512
- Network: Upgrade to 100 Gbps instances
- Cost: ~$80K/month
🎓 Conclusion
Going from 140K to 1 million messages/sec required:
- Understanding the architecture (operators, tasks, slots)
- Systematic testing (change one thing at a time)
- Bottleneck identification (backlog test was key!)
- Right-sizing resources (not just throwing more hardware)
- Infrastructure matching (Pulsar + Flink capacity aligned)
The biggest lesson?
The bottleneck is rarely where you think it is. Measure, test, iterate.
In our case, we assumed Flink was the problem. Turned out:
- Phase 1-3: Flink configuration issues
- Phase 4: Pulsar was limiting Flink!
- Phase 5-6: Back to Flink (needed more CPU per slot)
Both systems needed optimization to achieve 1M msg/sec.
The journey taught me:
- Start with fundamentals: Parallelism, partitions, resource allocation
- Use systematic testing: Change one variable at a time
- Leverage diagnostic tools: Backlog testing, metrics monitoring
- Think holistically: Tune the entire pipeline, not just one component
📚 Resources
- Flink Load Repository: flink-load
- Complete Implementation: RealtimeDataPlatform
- Apache Flink Performance Tuning: flink.apache.org/performance
- Flink Resource Configuration: Flink Memory Setup Guide
Have you tuned Flink for high throughput? What challenges did you face? Share your optimization stories in the comments! 👇
Follow me for more deep dives on stream processing, performance optimization, and distributed systems architecture!
Next in the series: "ClickHouse Performance: Ingesting 1M Events/Sec with Sub-Second Queries"
📋 Quick Reference
Final Configuration Checklist
✅ Pulsar partitions: 64
✅ Flink parallelism: 64
✅ TaskManager slots: 16 (per TM)
✅ Slot-to-CPU ratio: 1:1
✅ TaskManager count: 4
✅ Instance type: c5.4xlarge
✅ Total vCPUs: 64
✅ Total slots: 64
✅ Pulsar bookies: 6
✅ Pulsar instance: i7i.8xlarge
✅ NVMe separation: Yes
✅ journalSyncData: false
Performance Validation Commands
# Check Flink throughput
kubectl logs -n flink-benchmark deployment/iot-flink-job | grep "records"
# Check parallelism
kubectl exec -n flink-benchmark <jm-pod> -- \
curl localhost:8081/jobs/<job-id> | jq '.vertices[].parallelism'
# Check CPU usage
kubectl top pods -n flink-benchmark
# Check backpressure
kubectl exec -n flink-benchmark <jm-pod> -- \
curl localhost:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure
# Run the backlog test
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0
# Watch throughput increase as Flink catches up
Tags: #flink #performance #streaming #aws #optimization #parallelism #tuning #apacheflink #realtimedata
Top comments (0)