DEV Community

HyperscaleDesignHub
HyperscaleDesignHub

Posted on

Real-Time Data Streaming Platform: From 140K to 1 Million Messages/Sec - A Flink Performance Tuning Journey

Performance tuning a distributed streaming system is a journey of discovery, experimentation, and learning. This is the story of how I scaled a Flink streaming job from 140K messages/sec to 1 million messages/sec - a 7x improvement through systematic optimization.

Spoiler alert: The bottleneck wasn't where I expected!

🏗️ Real-Time Data Streaming Platform Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                Real-Time Data Streaming Platform                        │
│                        AWS EKS (Kubernetes 1.31)                       │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌───────────┐ │
│  │   PRODUCER  │───▶│   PULSAR    │───▶│    FLINK    │───▶│CLICKHOUSE │ │
│  │             │    │             │    │             │    │           │ │
│  │ IoT Sensors │    │ Message     │    │ Stream      │    │ Analytics │ │
│  │ AVRO Data   │    │ Broker      │    │ Processing  │    │ Database  │ │
│  │             │    │             │    │             │    │           │ │
│  │ 4x c5.4xl   │    │ 6x i7i.8xl  │    │ 4x c5.4xl   │    │6x r6id.4xl│ │
│  │ 250K/sec    │    │ Partitions  │    │ Parallelism │    │ Real-time │ │
│  │ each node   │    │ 64          │    │ 64          │    │ Queries   │ │
│  └─────────────┘    └─────────────┘    └─────────────┘    └───────────┘ │
│                                                                         │
│  Data Flow:                                                             │
│  300-byte AVRO ──▶ Pulsar Topics ──▶ keyBy(device_id) ──▶ ClickHouse   │
│  IoT Messages       (Persistent)      1-min Windows       Analytics    │
│                                                                         │
│  Performance Target: 1,000,000 messages/sec end-to-end                 │
└─────────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Pipeline Components:

  • Producers: Generate 300-byte AVRO-serialized IoT sensor data
  • Pulsar: Distributed message broker with 64 partitions for parallel processing
  • Flink: Stream processing engine with 64-way parallelism for aggregations
  • ClickHouse: Real-time analytics database for sub-second queries

The Challenge: Initial setup only achieved 140K msg/sec instead of target 1M msg/sec!

🎯 The Starting Point

Initial Setup:

  • Goal: Process 1 million messages/sec from Pulsar
  • Message Size: 300 bytes (AVRO-serialized sensor data)
  • Job: Source → keyBy → Window (1-min) → Aggregate → Sink (ClickHouse)
  • Result: Only 140K msg/sec 😱

Something was clearly wrong. Time to dig in.

🔍 Understanding the Flink Job Structure

Before tuning, I needed to understand what I was working with. You can find the complete implementation in the flink-load directory:

// JDBCFlinkConsumer.java - The Pipeline
PulsarSource<SensorRecord> source = PulsarSource.builder()
    .setServiceUrl(pulsarUrl)
    .setTopics("persistent://public/default/iot-sensor-data")
    .setDeserializationSchema(new AvroSensorDataDeserializationSchema())
    .build();

DataStream<SensorRecord> sensorStream = env.fromSource(source, ...);

sensorStream
    .keyBy(record -> record.device_id)  // ← Data shuffle happens here!
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .aggregate(new SensorAggregator())
    .addSink(new ClickHouseJDBCSink(clickhouseUrl));
Enter fullscreen mode Exit fullscreen mode

The Operator Chain

Flink optimizes by chaining operators together:

Task 1 (Source Group):
  └─ Pulsar Source (I/O-bound)
     └─ AVRO Deserialize
        └─ keyBy (compute hash, network shuffle)

Task 2 (Window Group):
  └─ Window Aggregate (CPU-bound)
     └─ ClickHouse Sink (I/O-bound)
Enter fullscreen mode Exit fullscreen mode

Key Insight: The keyBy() causes data shuffling between Task 1 and Task 2. This creates 2 separate task groups that need slots.

Total slots needed = Parallelism × 2 (not parallelism × 4 operators, due to chaining!)

📊 Phase 1: Initial Configuration (140K msg/sec)

Configuration

# FlinkDeployment configuration
parallelism: 8
pulsar_partitions: 8

# Resource allocation
taskmanager:
  replicas: 4
  resources:
    cpu: 2      # 2 vCPUs per TaskManager
    memory: 4Gi

# Task slot mapping
taskmanager.numberOfTaskSlots: 2  # 2 slots per vCPU (2:1 ratio)
Enter fullscreen mode Exit fullscreen mode

Calculation:

  • 4 TaskManagers × 2 slots = 8 total slots
  • Parallelism = 8
  • Each slot gets: 2 vCPUs / 2 slots = 1 vCPU per slot

Instance Type: c5.2xlarge (8 vCPU, 16 GB RAM)

Results

# Metrics after 5 minutes
Records In:  140,000 msg/sec
Records Out: 2,300 aggregated records/min
CPU Usage:   85-95% (maxed out!)
Backpressure: HIGH on source operators
Enter fullscreen mode Exit fullscreen mode

Problem Identified: Not enough parallelism to match Pulsar's 8 partitions efficiently.

🚀 Phase 2: Scale Parallelism (480K msg/sec)

The Hypothesis

If 8 parallel instances handle 140K msg/sec, then:

  • Per-instance rate: 140K / 8 = 17,500 msg/sec
  • For 1M msg/sec: 1,000,000 / 17,500 ≈ 57 instances needed

Let's try 64 for a clean power-of-2.

Configuration Changes

# Increase Pulsar partitions to 64
pulsar_partitions: 64

# Increase Flink parallelism to match
parallelism: 64

# Task slot mapping (2:1 ratio maintained)
taskmanager.numberOfTaskSlots: 2  # Still 2 slots per vCPU

# Calculate required TaskManagers:
# 64 slots needed / 2 slots per TM = 32 TaskManagers
# OR with 8 vCPU machines: 64 slots / 16 slots per machine = 4 machines
taskmanager:
  replicas: 16  # Using c5.2xlarge (8 vCPU, 16 slots per machine)
Enter fullscreen mode Exit fullscreen mode

Wait... 16 TaskManagers on c5.2xlarge?

Let me recalculate:

  • c5.2xlarge: 8 vCPUs
  • Task slots: 2 per vCPU = 16 slots per machine
  • Need 64 slots total
  • Machines needed: 64 / 16 = 4 machines
# Corrected configuration
taskmanager:
  replicas: 4  # 4 × c5.2xlarge machines
  resources:
    cpu: 8      # Full 8 vCPUs
    memory: 16Gi
Enter fullscreen mode Exit fullscreen mode

Results - Phase 2

# Metrics after deployment
Records In:  480,000 msg/sec  (3.4x improvement!)
Records Out: 8,000 aggregated records/min
CPU Usage:   65-75% per TaskManager
Backpressure: MEDIUM
Enter fullscreen mode Exit fullscreen mode

Progress: 140K → 480K msg/sec ✅

But still far from 1M! What's the bottleneck now?

🔧 Phase 3: CPU Resource Tuning (600K msg/sec)

The Investigation

Looking at the Flink deployment YAML in the repository:

# flink-job-deployment.yaml - JobManager pod spec
spec:
  job:
    jarURI: local:///opt/flink/usrlib/flink-consumer-1.0.0.jar
    parallelism: 64
    upgradeMode: stateless
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    # CPU limit in deployment
Enter fullscreen mode Exit fullscreen mode

Found it! The TaskManager pod resource definition:

resources:
  requests:
    cpu: "1"       # Only 1 CPU requested! 🚨
    memory: "4Gi"
  limits:
    cpu: "2"       # And max 2 CPUs
    memory: "8Gi"
Enter fullscreen mode Exit fullscreen mode

Problem: Each TaskManager was throttled to 2 CPUs max, but we have 8 vCPU machines!

The Fix

# Updated TaskManager resources
resources:
  requests:
    cpu: "5"       # Request 5 CPUs (was 1)
    memory: "8Gi"
  limits:
    cpu: "8"       # Allow up to 8 CPUs (was 2)
    memory: "16Gi"
Enter fullscreen mode Exit fullscreen mode

Results - Phase 3

# After CPU increase
Records In:  600,000 msg/sec  (1.25x improvement!)
CPU Usage:   75-85% per TaskManager (better utilization)
Backpressure: LOW → MEDIUM
Enter fullscreen mode Exit fullscreen mode

Progress: 480K → 600K msg/sec ✅

Still not 1M. Time for the real detective work!

🔎 Phase 4: The Backlog Experiment (890K msg/sec)

The Eureka Moment

I noticed a huge backlog forming in Pulsar (100M+ messages). So I tried an experiment:

Stop all producers and let Flink catch up.

# Stop producers
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0

# Watch Flink metrics
kubectl exec -n flink-benchmark <jobmanager> -- curl localhost:8081/jobs/<job-id>/metrics
Enter fullscreen mode Exit fullscreen mode

The Shocking Result

# Flink consuming from backlog (no new messages)
Records In:  890,000 msg/sec  😲

# CPU and memory usage
CPU: 85-90% (near max)
Memory: Stable
Network: ~270 MB/sec ingress
Enter fullscreen mode Exit fullscreen mode

Key Insight: Flink could process 890K msg/sec when reading from Pulsar backlog, but only 600K msg/sec with live producers!

Conclusion: Pulsar was the bottleneck! 🎯

🏎️ Phase 5: Upgrade Pulsar Infrastructure (1M msg/sec)

The Pulsar Problem

Previous Setup:

  • Instance: i3en.6xlarge (24 vCPU, 192GB RAM, 2x 7.5TB NVMe)
  • Bookies: 4 nodes
  • Brokers: 4 nodes

Issues:

  • Older generation (i3en)
  • Only 4 bookies for 1M msg/sec
  • Journal and ledger on same NVMe device

The Pulsar Upgrade

# Updated Terraform configuration
pulsar_broker_bookie_config = {
  instance_types = ["i7i.8xlarge"]  # Upgraded from i3en.6xlarge
  desired_size = 6                   # Increased from 4
}
Enter fullscreen mode Exit fullscreen mode

i7i.8xlarge Advantages:

  • Newer generation (better CPU IPC)
  • 32 vCPUs (vs 24 on i3en)
  • 2× NVMe devices (3.75TB each)
  • Lower per-device latency

NVMe Device Separation (Critical!):

# Device mapping on each i7i.8xlarge bookie
/dev/nvme1n1 → /mnt/bookkeeper/journal  # Journal (WAL)
/dev/nvme2n1 → /mnt/bookkeeper/ledgers  # Ledgers (Data)
Enter fullscreen mode Exit fullscreen mode

This separation eliminates I/O contention between:

  • Journal: Sequential writes (low latency critical)
  • Ledgers: Random reads/writes (capacity critical)

Results - Phase 5

# After Pulsar upgrade
Records In:  1,040,000 msg/sec  🎉
CPU (Flink): 80-85% per TaskManager
CPU (Pulsar): 70-75% per Broker/Bookie
Backpressure: NONE to LOW
End-to-end latency: <2 seconds
Enter fullscreen mode Exit fullscreen mode

SUCCESS: 600K → 1,040K msg/sec ✅

🎯 Phase 6: Final Flink Optimization (1.04M msg/sec sustained)

The Last Mile

Even with Pulsar fixed, I wanted to optimize Flink further. The 2:1 slot-to-CPU ratio was still suboptimal for our CPU-heavy aggregation workload.

Configuration Change

# Final Flink configuration
taskmanager:
  replicas: 4  # 4 × c5.4xlarge machines
  resources:
    cpu: 16     # Full 16 vCPUs per TM
    memory: 32Gi

# Slot configuration
taskmanager.numberOfTaskSlots: 16  # 1:1 ratio (was 2:1)
Enter fullscreen mode Exit fullscreen mode

New Calculation:

  • 4 TaskManagers × 16 slots = 64 total slots
  • Each slot gets: 16 vCPUs / 16 slots = 1 vCPU per slot

Instance Upgrade: c5.2xlarge → c5.4xlarge (16 vCPU, 32 GB RAM)

Why 1:1 Ratio Works Better

Our Pipeline Analysis:

Source (I/O)  keyBy  Window (CPU)  Aggregate (CPU)  Sink (I/O)
   🟢                    🔴             🔴               🟢

CPU-intensive operators: 2/4 (Window + Aggregate)
I/O-bound operators: 2/4 (Source + Sink)
Enter fullscreen mode Exit fullscreen mode

Decision: Since aggregation is CPU-heavy, 1:1 gives each task dedicated CPU.

Final Results

# Sustained performance metrics
Records In:  1,040,000 msg/sec
Records Out: 17,333 aggregated records/min
CPU Usage:   75-80% per TaskManager (optimal)
Memory Usage: 60-70% (plenty of headroom)
Backpressure: NONE
GC Pressure: LOW
Checkpoint Duration: 5-8 seconds
Enter fullscreen mode Exit fullscreen mode

Final Achievement: 1,040,000 messages/sec sustained 🏆

🧪 The Backlog Test - A Critical Technique

Why This Test Matters

The backlog consumption test reveals your true system capacity:

# The test process
1. Run producers at max speed (build backlog)
2. Stop producers completely
3. Measure Flink consumption from backlog
4. Compare: Backlog rate vs Live rate
Enter fullscreen mode Exit fullscreen mode

What it tells you:

  • Shows true Flink capacity
  • Reveals whether Pulsar or Flink is the bottleneck

In our case:

  • Live: 600K msg/sec
  • Backlog: 890K msg/sec
  • Conclusion: Pulsar was limiting, not Flink!

🔧 Final Architecture

Flink Configuration

# FlinkDeployment - Final configuration
spec:
  flinkVersion: v1_18

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "16"
    parallelism.default: "64"
    state.backend: rocksdb
    state.checkpoints.dir: s3://benchmark-high-infra-state/checkpoints
    execution.checkpointing.interval: 60000
    execution.checkpointing.mode: EXACTLY_ONCE

  jobManager:
    resource:
      memory: "8Gi"
      cpu: 4
    replicas: 1

  taskManager:
    resource:
      memory: "32Gi"
      cpu: 16  # Full 16 vCPUs
    replicas: 4  # 4 × c5.4xlarge machines
Enter fullscreen mode Exit fullscreen mode

Resource Summary:

  • 4 TaskManagers on c5.4xlarge (16 vCPU, 32 GB each)
  • 64 task slots total (16 per TM)
  • 1:1 slot-to-vCPU ratio
  • 64 parallelism (matches Pulsar partitions)

Pulsar Configuration

# Pulsar values - Final configuration
bookkeeper:
  replicaCount: 6  # Increased from 4

  # NVMe device configuration
  volumes:
    journal:
      size: 200Gi
      storageClassName: local-nvme  # /dev/nvme1n1
    ledgers:
      size: 1000Gi
      storageClassName: local-nvme  # /dev/nvme2n1

  configData:
    journalMaxSizeMB: "2048"
    journalSyncData: "false"
    journalAdaptiveGroupWrites: "true"
    ledgerStorageClass: "org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage"

broker:
  replicaCount: 6
  nodeSelector:
    node-type: broker-bookie
Enter fullscreen mode Exit fullscreen mode

Instance Type: i7i.8xlarge (32 vCPU, 256GB RAM, 2× 3.75TB NVMe)

📊 Performance Comparison Table

Phase Config Throughput Bottleneck Fix
Phase 1 8 parallel, c5.2xlarge, 2:1 140K msg/sec Low parallelism Increase to 64
Phase 2 64 parallel, c5.2xlarge, 2:1 480K msg/sec CPU throttling Increase CPU limit
Phase 3 64 parallel, c5.2xlarge, 2:1, 8 CPU 600K msg/sec Pulsar capacity Upgrade Pulsar
Backlog Same config, no producers 890K msg/sec Flink needs more CPU Upgrade to c5.4xlarge
Phase 4 64 parallel, c5.4xlarge, 1:1 1,040K msg/sec None! ✅ Success

💡 Best Practices for Flink at Scale

1. Match Parallelism to Source Partitions

Pulsar Partitions = Flink Parallelism
Enter fullscreen mode Exit fullscreen mode

This ensures:

  • Optimal work distribution
  • No partition skew
  • Maximum throughput

2. Use the Right Slot-to-CPU Ratio

// Analyze your job operators
Source (I/O)   keyBy  Window (CPU)   Aggregate (CPU)   Sink (I/O)
   🟢                      🔴              🔴               🟢

// Count CPU-bound operators
CPU-bound: 2 operators (window, aggregate)
I/O-bound: 2 operators (source, sink)

// Decision: 1:1 ratio (due to CPU-heavy aggregate)
Enter fullscreen mode Exit fullscreen mode

3. Right-Size Your TaskManager Instances

Options for 64 slots with 1:1 ratio:

Instance vCPU Machines Cost/mo Network Best For
c5.2xlarge 8 8 $2,200 Up to 10 Gbps Budget
c5.4xlarge 16 4 $2,400 Up to 10 Gbps Balanced
c5.9xlarge 36 2 $2,700 10 Gbps High memory
c5.12xlarge 48 2 $3,600 12 Gbps Max performance

Our choice: c5.4xlarge - Best balance of cost and manageability

4. Monitor These Metrics

# Critical Flink metrics to watch

# 1. Backpressure (should be LOW)
flink_taskmanager_job_task_backPressureTimeMsPerSecond

# 2. Records per second
rate(flink_taskmanager_job_task_numRecordsInPerSecond[1m])

# 3. Checkpoint duration (should be < 10% of interval)
flink_jobmanager_job_lastCheckpointDuration

# 4. CPU usage (should be 70-85%)
container_cpu_usage_seconds_total{pod=~"flink-taskmanager.*"}

# 5. Memory usage
flink_taskmanager_Status_JVM_Memory_Heap_Used
Enter fullscreen mode Exit fullscreen mode

5. Test with Backlog

Always do the backlog test:

# 1. Build backlog (run producers at full speed)
# 2. Stop producers
# 3. Measure Flink consumption rate
# 4. This is your TRUE Flink capacity
Enter fullscreen mode Exit fullscreen mode

If backlog consumption > live consumption:
Upstream system (Pulsar) is the bottleneck

If backlog consumption ≈ live consumption:
Flink is the bottleneck

🎯 Key Takeaways

What Worked

Parallelism = Partitions (64 = 64)

1:1 slot-to-CPU ratio for CPU-bound workloads

Bigger instances (c5.4xlarge) over many small ones

Upgraded Pulsar (i3en → i7i, 4 → 6 nodes)

NVMe device separation (journal vs ledgers)

Disabled journalSyncData in BookKeeper

Backlog testing to identify bottlenecks

What Didn't Work

2:1 slot-to-CPU ratio (insufficient CPU per task)

Low CPU limits in pod specs (throttling)

Too few Pulsar bookies (4 → needed 6)

Single NVMe device for journal+ledger (I/O contention)

Older instance types (i3en vs i7i)

💰 Final Cost

Component Instance Count Monthly Cost
Flink JM c5.4xlarge 1 $480
Flink TM c5.4xlarge 4 $1,920
Flink Total $2,400
Pulsar (Broker+Bookie) i7i.8xlarge 6 $12,960
Infrastructure Total ~$15,360/month

Cost per message: $0.0000154 per 1M messages

🚀 Scaling Beyond 1M

Want to go higher? Here's the roadmap:

2M msg/sec:

  • Pulsar: 8 bookies (i7i.8xlarge)
  • Flink: 8 TaskManagers (c5.4xlarge), parallelism 128
  • Partitions: 128
  • Cost: ~$23K/month

5M msg/sec:

  • Pulsar: 15 bookies (i7i.8xlarge)
  • Flink: 16 TaskManagers (c5.4xlarge), parallelism 256
  • Partitions: 256
  • Cost: ~$40K/month

10M msg/sec:

  • Pulsar: 30 bookies (i7i.16xlarge)
  • Flink: 32 TaskManagers (c5.9xlarge), parallelism 512
  • Network: Upgrade to 100 Gbps instances
  • Cost: ~$80K/month

🎓 Conclusion

Going from 140K to 1 million messages/sec required:

  1. Understanding the architecture (operators, tasks, slots)
  2. Systematic testing (change one thing at a time)
  3. Bottleneck identification (backlog test was key!)
  4. Right-sizing resources (not just throwing more hardware)
  5. Infrastructure matching (Pulsar + Flink capacity aligned)

The biggest lesson?

The bottleneck is rarely where you think it is. Measure, test, iterate.

In our case, we assumed Flink was the problem. Turned out:

  • Phase 1-3: Flink configuration issues
  • Phase 4: Pulsar was limiting Flink!
  • Phase 5-6: Back to Flink (needed more CPU per slot)

Both systems needed optimization to achieve 1M msg/sec.

The journey taught me:

  • Start with fundamentals: Parallelism, partitions, resource allocation
  • Use systematic testing: Change one variable at a time
  • Leverage diagnostic tools: Backlog testing, metrics monitoring
  • Think holistically: Tune the entire pipeline, not just one component

📚 Resources


Have you tuned Flink for high throughput? What challenges did you face? Share your optimization stories in the comments! 👇

Follow me for more deep dives on stream processing, performance optimization, and distributed systems architecture!

Next in the series: "ClickHouse Performance: Ingesting 1M Events/Sec with Sub-Second Queries"


📋 Quick Reference

Final Configuration Checklist

✅ Pulsar partitions: 64
✅ Flink parallelism: 64
✅ TaskManager slots: 16 (per TM)
✅ Slot-to-CPU ratio: 1:1
✅ TaskManager count: 4
✅ Instance type: c5.4xlarge
✅ Total vCPUs: 64
✅ Total slots: 64
✅ Pulsar bookies: 6
✅ Pulsar instance: i7i.8xlarge
✅ NVMe separation: Yes
✅ journalSyncData: false
Enter fullscreen mode Exit fullscreen mode

Performance Validation Commands

# Check Flink throughput
kubectl logs -n flink-benchmark deployment/iot-flink-job | grep "records"

# Check parallelism
kubectl exec -n flink-benchmark <jm-pod> -- \
  curl localhost:8081/jobs/<job-id> | jq '.vertices[].parallelism'

# Check CPU usage
kubectl top pods -n flink-benchmark

# Check backpressure
kubectl exec -n flink-benchmark <jm-pod> -- \
  curl localhost:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure

# Run the backlog test
kubectl scale deployment iot-producer -n iot-pipeline --replicas=0
# Watch throughput increase as Flink catches up
Enter fullscreen mode Exit fullscreen mode

Tags: #flink #performance #streaming #aws #optimization #parallelism #tuning #apacheflink #realtimedata

Top comments (0)