Performance tuning a distributed streaming system is a journey of discovery, experimentation, and learning. This is the story of how I scaled a Flink streaming job from 140K messages/sec to 1 million messages/sec - a 7x improvement through systematic optimization.
Spoiler alert: The bottleneck wasn't where I expected!
ποΈ Real-Time Data Streaming Platform Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Data Streaming Platform β
β AWS EKS (Kubernetes 1.31) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β
β β PRODUCER βββββΆβ PULSAR βββββΆβ FLINK βββββΆβCLICKHOUSE β β
β β β β β β β β β β
β β IoT Sensors β β Message β β Stream β β Analytics β β
β β AVRO Data β β Broker β β Processing β β Database β β
β β β β β β β β β β
β β 4x c5.4xl β β 6x i7i.8xl β β 4x c5.4xl β β6x r6id.4xlβ β
β β 250K/sec β β Partitions β β Parallelism β β Real-time β β
β β each node β β 64 β β 64 β β Queries β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β
β β
β Data Flow: β
β 300-byte AVRO βββΆ Pulsar Topics βββΆ keyBy(device_id) βββΆ ClickHouse β
β IoT Messages (Persistent) 1-min Windows Analytics β
β β
β Performance Target: 1,000,000 messages/sec end-to-end β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pipeline Components:
- Producers: Generate 300-byte AVRO-serialized IoT sensor data
- Pulsar: Distributed message broker with 64 partitions for parallel processing
- Flink: Stream processing engine with 64-way parallelism for aggregations
- ClickHouse: Real-time analytics database for sub-second queries
The Challenge: Initial setup only achieved 140K msg/sec instead of target 1M msg/sec!
π― The Starting Point
Initial Setup:
- Goal: Process 1 million messages/sec from Pulsar
- Message Size: 300 bytes (AVRO-serialized sensor data)
- Job: Source β keyBy β Window (1-min) β Aggregate β Sink (ClickHouse)
- Result: Only 140K msg/sec π±
Something was clearly wrong. Time to dig in.
π Understanding the Flink Job Structure
Before tuning, I needed to understand what I was working with. You can find the complete implementation in the flink-load directory:
// JDBCFlinkConsumer.java - The Pipeline
PulsarSource<SensorRecord> source = PulsarSource.builder()
.setServiceUrl(pulsarUrl)
.setTopics("persistent://public/default/iot-sensor-data")
.setDeserializationSchema(new AvroSensorDataDeserializationSchema())
.build();
DataStream<SensorRecord> sensorStream = env.fromSource(source, ...);
sensorStream
.keyBy(record -> record.device_id) // β Data shuffle happens here!
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new SensorAggregator())
.addSink(new ClickHouseJDBCSink(clickhouseUrl));
The Operator Chain
Flink optimizes by chaining operators together:
Task 1 (Source Group):
ββ Pulsar Source (I/O-bound)
ββ AVRO Deserialize
ββ keyBy (compute hash, network shuffle)
Task 2 (Window Group):
ββ Window Aggregate (CPU-bound)
ββ ClickHouse Sink (I/O-bound)
Key Insight: The keyBy() causes data shuffling between Task 1 and Task 2. This creates 2 separate task groups that need slots.
Total slots needed = Parallelism Γ 2 (not parallelism Γ 4 operators, due to chaining!)
π Phase 1: Initial Configuration (140K msg/sec)
Configuration
# FlinkDeployment configuration
parallelism: 8
pulsar_partitions: 8
# Resource allocation
taskmanager:
replicas: 4
resources:
cpu: 2 # 2 vCPUs per TaskManager
memory: 4Gi
# Task slot mapping
taskmanager.numberOfTaskSlots: 2 # 2 slots per vCPU (2:1 ratio)
Calculation:
- 4 TaskManagers Γ 2 slots = 8 total slots
- Parallelism = 8
- Each slot gets: 2 vCPUs / 2 slots = 1 vCPU per slot
Instance Type: c5.2xlarge (8 vCPU, 16 GB RAM)
Results
# Metrics after 5 minutes
Records In: 140,000 msg/sec
Records Out: 2,300 aggregated records/min
CPU Usage: 85-95% (maxed out!)
Backpressure: HIGH on source operators
Problem Identified: Not enough parallelism to match Pulsar's 8 partitions efficiently.
π Phase 2: Scale Parallelism (480K msg/sec)
The Hypothesis
If 8 parallel instances handle 140K msg/sec, then:
- Per-instance rate: 140K / 8 = 17,500 msg/sec
- For 1M msg/sec: 1,000,000 / 17,500 β 57 instances needed
Let's try 64 for a clean power-of-2.
Configuration Changes
# Increase Pulsar partitions to 64
pulsar_partitions: 64
# Increase Flink parallelism to match
parallelism: 64
# Task slot mapping (2:1 ratio maintained)
taskmanager.numberOfTaskSlots: 2 # Still 2 slots per vCPU
# Calculate required TaskManagers:
# 64 slots needed / 2 slots per TM = 32 TaskManagers
# OR with 8 vCPU machines: 64 slots / 16 slots per machine = 4 machines
taskmanager:
replicas: 16 # Using c5.2xlarge (8 vCPU, 16 slots per machine)
Wait... 16 TaskManagers on c5.2xlarge?
Let me recalculate:
- c5.2xlarge: 8 vCPUs
- Task slots: 2 per vCPU = 16 slots per machine
- Need 64 slots total
- Machines needed: 64 / 16 = 4 machines
# Corrected configuration
taskmanager:
replicas: 4 # 4 Γ c5.2xlarge machines
resources:
cpu: 8 # Full 8 vCPUs
memory: 16Gi
Results - Phase 2
# Metrics after deployment
Records In: 480,000 msg/sec (3.4x improvement!)
Records Out: 8,000 aggregated records/min
CPU Usage: 65-75% per TaskManager
Backpressure: MEDIUM
Progress: 140K β 480K msg/sec β
But still far from 1M! What's the bottleneck now?
π§ Phase 3: CPU Resource Tuning (600K msg/sec)
The Investigation
Looking at the Flink deployment YAML in the repository:
# flink-job-deployment.yaml - JobManager pod spec
spec:
job:
jarURI: local:///opt/flink/usrlib/flink-consumer-1.0.0.jar
parallelism: 64
upgradeMode: stateless
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
# CPU limit in deployment
Found it! The TaskManager pod resource definition:
resources:
requests:
cpu: "1" # Only 1 CPU requested! π¨
memory: "4Gi"
limits:
cpu: "2" # And max 2 CPUs
memory: "8Gi"
Problem: Each TaskManager was throttled to 2 CPUs max, but we have 8 vCPU machines!
The Fix
# Updated TaskManager resources
resources:
requests:
cpu: "5" # Request 5 CPUs (was 1)
memory: "8Gi"
limits:
cpu: "8" # Allow up to 8 CPUs (was 2)
memory: "16Gi"
Results - Phase 3
# After CPU increase
Records In: 600,000 msg/sec (1.25x improvement!)
CPU Usage: 75-85% per TaskManager (better utilization)
Backpressure: LOW β MEDIUM
Progress: 480K β 600K msg/sec β
Still not 1M. Time for the real detective work!
π Phase 4: The Backlog Experiment (890K msg/sec)
The Eureka Moment
I noticed a huge backlog forming in Pulsar (100M+ messages). So I tried an experiment:
Stop all producers and let Flink catch up.
# Stop producers
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0
# Watch Flink metrics
kubectl exec -n flink-benchmark <jobmanager> -- curl localhost:8081/jobs/<job-id>/metrics
The Shocking Result
# Flink consuming from backlog (no new messages)
Records In: 890,000 msg/sec π²
# CPU and memory usage
CPU: 85-90% (near max)
Memory: Stable
Network: ~270 MB/sec ingress
Key Insight: Flink could process 890K msg/sec when reading from Pulsar backlog, but only 600K msg/sec with live producers!
Conclusion: Pulsar was the bottleneck! π―
ποΈ Phase 5: Upgrade Pulsar Infrastructure (1M msg/sec)
The Pulsar Problem
Previous Setup:
- Instance: i3en.6xlarge (24 vCPU, 192GB RAM, 2x 7.5TB NVMe)
- Bookies: 4 nodes
- Brokers: 4 nodes
Issues:
- Older generation (i3en)
- Only 4 bookies for 1M msg/sec
- Journal and ledger on same NVMe device
The Pulsar Upgrade
# Updated Terraform configuration
pulsar_broker_bookie_config = {
instance_types = ["i7i.8xlarge"] # Upgraded from i3en.6xlarge
desired_size = 6 # Increased from 4
}
i7i.8xlarge Advantages:
- Newer generation (better CPU IPC)
- 32 vCPUs (vs 24 on i3en)
- 2Γ NVMe devices (3.75TB each)
- Lower per-device latency
NVMe Device Separation (Critical!):
# Device mapping on each i7i.8xlarge bookie
/dev/nvme1n1 β /mnt/bookkeeper/journal # Journal (WAL)
/dev/nvme2n1 β /mnt/bookkeeper/ledgers # Ledgers (Data)
This separation eliminates I/O contention between:
- Journal: Sequential writes (low latency critical)
- Ledgers: Random reads/writes (capacity critical)
Results - Phase 5
# After Pulsar upgrade
Records In: 1,040,000 msg/sec π
CPU (Flink): 80-85% per TaskManager
CPU (Pulsar): 70-75% per Broker/Bookie
Backpressure: NONE to LOW
End-to-end latency: <2 seconds
SUCCESS: 600K β 1,040K msg/sec β
π― Phase 6: Final Flink Optimization (1.04M msg/sec sustained)
The Last Mile
Even with Pulsar fixed, I wanted to optimize Flink further. The 2:1 slot-to-CPU ratio was still suboptimal for our CPU-heavy aggregation workload.
Configuration Change
# Final Flink configuration
taskmanager:
replicas: 4 # 4 Γ c5.4xlarge machines
resources:
cpu: 16 # Full 16 vCPUs per TM
memory: 32Gi
# Slot configuration
taskmanager.numberOfTaskSlots: 16 # 1:1 ratio (was 2:1)
New Calculation:
- 4 TaskManagers Γ 16 slots = 64 total slots
- Each slot gets: 16 vCPUs / 16 slots = 1 vCPU per slot
Instance Upgrade: c5.2xlarge β c5.4xlarge (16 vCPU, 32 GB RAM)
Why 1:1 Ratio Works Better
Our Pipeline Analysis:
Source (I/O) β keyBy β Window (CPU) β Aggregate (CPU) β Sink (I/O)
π’ π΄ π΄ π’
CPU-intensive operators: 2/4 (Window + Aggregate)
I/O-bound operators: 2/4 (Source + Sink)
Decision: Since aggregation is CPU-heavy, 1:1 gives each task dedicated CPU.
Final Results
# Sustained performance metrics
Records In: 1,040,000 msg/sec
Records Out: 17,333 aggregated records/min
CPU Usage: 75-80% per TaskManager (optimal)
Memory Usage: 60-70% (plenty of headroom)
Backpressure: NONE
GC Pressure: LOW
Checkpoint Duration: 5-8 seconds
Final Achievement: 1,040,000 messages/sec sustained π
π§ͺ The Backlog Test - A Critical Technique
Why This Test Matters
The backlog consumption test reveals your true system capacity:
# The test process
1. Run producers at max speed (build backlog)
2. Stop producers completely
3. Measure Flink consumption from backlog
4. Compare: Backlog rate vs Live rate
What it tells you:
- Shows true Flink capacity
- Reveals whether Pulsar or Flink is the bottleneck
In our case:
- Live: 600K msg/sec
- Backlog: 890K msg/sec
- Conclusion: Pulsar was limiting, not Flink!
π§ Final Architecture
Flink Configuration
# FlinkDeployment - Final configuration
spec:
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "16"
parallelism.default: "64"
state.backend: rocksdb
state.checkpoints.dir: s3://benchmark-high-infra-state/checkpoints
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
jobManager:
resource:
memory: "8Gi"
cpu: 4
replicas: 1
taskManager:
resource:
memory: "32Gi"
cpu: 16 # Full 16 vCPUs
replicas: 4 # 4 Γ c5.4xlarge machines
Resource Summary:
- 4 TaskManagers on c5.4xlarge (16 vCPU, 32 GB each)
- 64 task slots total (16 per TM)
- 1:1 slot-to-vCPU ratio
- 64 parallelism (matches Pulsar partitions)
Pulsar Configuration
# Pulsar values - Final configuration
bookkeeper:
replicaCount: 6 # Increased from 4
# NVMe device configuration
volumes:
journal:
size: 200Gi
storageClassName: local-nvme # /dev/nvme1n1
ledgers:
size: 1000Gi
storageClassName: local-nvme # /dev/nvme2n1
configData:
journalMaxSizeMB: "2048"
journalSyncData: "false"
journalAdaptiveGroupWrites: "true"
ledgerStorageClass: "org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage"
broker:
replicaCount: 6
nodeSelector:
node-type: broker-bookie
Instance Type: i7i.8xlarge (32 vCPU, 256GB RAM, 2Γ 3.75TB NVMe)
π Performance Comparison Table
| Phase | Config | Throughput | Bottleneck | Fix |
|---|---|---|---|---|
| Phase 1 | 8 parallel, c5.2xlarge, 2:1 | 140K msg/sec | Low parallelism | Increase to 64 |
| Phase 2 | 64 parallel, c5.2xlarge, 2:1 | 480K msg/sec | CPU throttling | Increase CPU limit |
| Phase 3 | 64 parallel, c5.2xlarge, 2:1, 8 CPU | 600K msg/sec | Pulsar capacity | Upgrade Pulsar |
| Backlog | Same config, no producers | 890K msg/sec | Flink needs more CPU | Upgrade to c5.4xlarge |
| Phase 4 | 64 parallel, c5.4xlarge, 1:1 | 1,040K msg/sec | None! | β Success |
π‘ Best Practices for Flink at Scale
1. Match Parallelism to Source Partitions
Pulsar Partitions = Flink Parallelism
This ensures:
- Optimal work distribution
- No partition skew
- Maximum throughput
2. Use the Right Slot-to-CPU Ratio
// Analyze your job operators
Source (I/O) β keyBy β Window (CPU) β Aggregate (CPU) β Sink (I/O)
π’ π΄ π΄ π’
// Count CPU-bound operators
CPU-bound: 2 operators (window, aggregate)
I/O-bound: 2 operators (source, sink)
// Decision: 1:1 ratio (due to CPU-heavy aggregate)
3. Right-Size Your TaskManager Instances
Options for 64 slots with 1:1 ratio:
| Instance | vCPU | Machines | Cost/mo | Network | Best For |
|---|---|---|---|---|---|
| c5.2xlarge | 8 | 8 | $2,200 | Up to 10 Gbps | Budget |
| c5.4xlarge | 16 | 4 | $2,400 | Up to 10 Gbps | Balanced β |
| c5.9xlarge | 36 | 2 | $2,700 | 10 Gbps | High memory |
| c5.12xlarge | 48 | 2 | $3,600 | 12 Gbps | Max performance |
Our choice: c5.4xlarge - Best balance of cost and manageability
4. Monitor These Metrics
# Critical Flink metrics to watch
# 1. Backpressure (should be LOW)
flink_taskmanager_job_task_backPressureTimeMsPerSecond
# 2. Records per second
rate(flink_taskmanager_job_task_numRecordsInPerSecond[1m])
# 3. Checkpoint duration (should be < 10% of interval)
flink_jobmanager_job_lastCheckpointDuration
# 4. CPU usage (should be 70-85%)
container_cpu_usage_seconds_total{pod=~"flink-taskmanager.*"}
# 5. Memory usage
flink_taskmanager_Status_JVM_Memory_Heap_Used
5. Test with Backlog
Always do the backlog test:
# 1. Build backlog (run producers at full speed)
# 2. Stop producers
# 3. Measure Flink consumption rate
# 4. This is your TRUE Flink capacity
If backlog consumption > live consumption:
β Upstream system (Pulsar) is the bottleneck
If backlog consumption β live consumption:
β Flink is the bottleneck
π― Key Takeaways
What Worked
β
Parallelism = Partitions (64 = 64)
β
1:1 slot-to-CPU ratio for CPU-bound workloads
β
Bigger instances (c5.4xlarge) over many small ones
β
Upgraded Pulsar (i3en β i7i, 4 β 6 nodes)
β
NVMe device separation (journal vs ledgers)
β
Disabled journalSyncData in BookKeeper
β
Backlog testing to identify bottlenecks
What Didn't Work
β 2:1 slot-to-CPU ratio (insufficient CPU per task)
β Low CPU limits in pod specs (throttling)
β Too few Pulsar bookies (4 β needed 6)
β Single NVMe device for journal+ledger (I/O contention)
β Older instance types (i3en vs i7i)
π° Final Cost
| Component | Instance | Count | Monthly Cost |
|---|---|---|---|
| Flink JM | c5.4xlarge | 1 | $480 |
| Flink TM | c5.4xlarge | 4 | $1,920 |
| Flink Total | $2,400 | ||
| Pulsar (Broker+Bookie) | i7i.8xlarge | 6 | $12,960 |
| Infrastructure Total | ~$15,360/month |
Cost per message: $0.0000154 per 1M messages
π Scaling Beyond 1M
Want to go higher? Here's the roadmap:
2M msg/sec:
- Pulsar: 8 bookies (i7i.8xlarge)
- Flink: 8 TaskManagers (c5.4xlarge), parallelism 128
- Partitions: 128
- Cost: ~$23K/month
5M msg/sec:
- Pulsar: 15 bookies (i7i.8xlarge)
- Flink: 16 TaskManagers (c5.4xlarge), parallelism 256
- Partitions: 256
- Cost: ~$40K/month
10M msg/sec:
- Pulsar: 30 bookies (i7i.16xlarge)
- Flink: 32 TaskManagers (c5.9xlarge), parallelism 512
- Network: Upgrade to 100 Gbps instances
- Cost: ~$80K/month
π Conclusion
Going from 140K to 1 million messages/sec required:
- Understanding the architecture (operators, tasks, slots)
- Systematic testing (change one thing at a time)
- Bottleneck identification (backlog test was key!)
- Right-sizing resources (not just throwing more hardware)
- Infrastructure matching (Pulsar + Flink capacity aligned)
The biggest lesson?
The bottleneck is rarely where you think it is. Measure, test, iterate.
In our case, we assumed Flink was the problem. Turned out:
- Phase 1-3: Flink configuration issues
- Phase 4: Pulsar was limiting Flink!
- Phase 5-6: Back to Flink (needed more CPU per slot)
Both systems needed optimization to achieve 1M msg/sec.
The journey taught me:
- Start with fundamentals: Parallelism, partitions, resource allocation
- Use systematic testing: Change one variable at a time
- Leverage diagnostic tools: Backlog testing, metrics monitoring
- Think holistically: Tune the entire pipeline, not just one component
π Resources
- Flink Load Repository: flink-load
- Complete Implementation: RealtimeDataPlatform
- Apache Flink Performance Tuning: flink.apache.org/performance
- Flink Resource Configuration: Flink Memory Setup Guide
Have you tuned Flink for high throughput? What challenges did you face? Share your optimization stories in the comments! π
Follow me for more deep dives on stream processing, performance optimization, and distributed systems architecture!
Next in the series: "ClickHouse Performance: Ingesting 1M Events/Sec with Sub-Second Queries"
π Quick Reference
Final Configuration Checklist
β
Pulsar partitions: 64
β
Flink parallelism: 64
β
TaskManager slots: 16 (per TM)
β
Slot-to-CPU ratio: 1:1
β
TaskManager count: 4
β
Instance type: c5.4xlarge
β
Total vCPUs: 64
β
Total slots: 64
β
Pulsar bookies: 6
β
Pulsar instance: i7i.8xlarge
β
NVMe separation: Yes
β
journalSyncData: false
Performance Validation Commands
# Check Flink throughput
kubectl logs -n flink-benchmark deployment/iot-flink-job | grep "records"
# Check parallelism
kubectl exec -n flink-benchmark <jm-pod> -- \
curl localhost:8081/jobs/<job-id> | jq '.vertices[].parallelism'
# Check CPU usage
kubectl top pods -n flink-benchmark
# Check backpressure
kubectl exec -n flink-benchmark <jm-pod> -- \
curl localhost:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure
# Run the backlog test
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0
# Watch throughput increase as Flink catches up
Tags: #flink #performance #streaming #aws #optimization #parallelism #tuning #apacheflink #realtimedata
Top comments (0)