When you're running a real-time streaming platform processing 1 million messages per second, you can't afford to be blind. You need comprehensive monitoring across all components - Pulsar, Flink, and ClickHouse - in a single unified view.
In this guide, I'll show you how to build a production-grade monitoring stack that provides real-time visibility into your entire streaming pipeline using VictoriaMetrics and Grafana.
π― What We're Building
A unified monitoring solution that:
- π Single Grafana instance for all components
- β‘ VictoriaMetrics as the metrics backend (Prometheus-compatible)
- π Real-time dashboards for Pulsar, Flink, and ClickHouse
- π Automated setup with scripts and Helm charts
- π¨ Pre-built dashboards ready to import
- π Scalable to handle 1M+ metrics/sec
ποΈ Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Unified Monitoring Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββ βββββββββββββββ ββββββββββββββββ
β Pulsar β β Flink β β ClickHouse β
β Metrics β β Metrics β β Metrics β
β :9090 β β :9249 β β :8123 β
ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬ββββββββ
β β β
β Prometheus β Prometheus β SQL
β Exposition β Reporter β Queries
β β β
ββββββββββββββββββ΄ββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β VMAgent β
β (Collector) β
β β
β Scrapes all β
β /metrics β
β endpoints β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β VictoriaMetricsβ
β (Storage) β
β β
β Time-series β
β Database β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Grafana β
β (Dashboards) β
β β
β β’ Pulsar β
β β’ Flink β
β β’ ClickHouse β
βββββββββββββββββββ
π¦ The Monitoring Stack
1. VictoriaMetrics Kubernetes Stack
The Pulsar Helm chart includes victoria-metrics-k8s-stack as a dependency:
# pulsar-load/helm/pulsar/Chart.yaml
dependencies:
- condition: victoria-metrics-k8s-stack.enabled
name: victoria-metrics-k8s-stack
repository: https://victoriametrics.github.io/helm-charts/
version: 0.38.x
What's included:
| Component | Purpose | Port |
|---|---|---|
| VMAgent | Metrics collector (replaces Prometheus) | 8429 |
| VMSingle | Time-series database storage | 8429 |
| Grafana | Visualization and dashboards | 3000 |
| Kube-State-Metrics | Kubernetes cluster metrics | 8080 |
| Node-Exporter | Node-level metrics | 9100 |
Why VictoriaMetrics over Prometheus?
β
10x better compression (less storage)
β
Faster queries (optimized for large datasets)
β
Lower memory usage (~2GB vs Prometheus's 16GB)
β
Prometheus-compatible (drop-in replacement)
β
Better retention (handles months of data)
2. Configuration in pulsar-values.yaml
# Enable Victoria Metrics stack
victoria-metrics-k8s-stack:
enabled: true
# VMAgent - Metrics collector
vmagent:
enabled: true
spec:
scrapeInterval: 15s
externalLabels:
cluster: benchmark-high-infra
environment: production
# Grafana configuration
grafana:
enabled: true
adminPassword: "admin123" # Change in production!
# Persistent storage for dashboards
persistence:
enabled: true
size: 10Gi
storageClassName: "gp3"
# Service exposure
service:
type: LoadBalancer # Accessible externally
port: 80
# Default datasource (VictoriaMetrics)
datasources:
datasources.yaml:
apiVersion: 1
datasources:
- name: VictoriaMetrics
type: prometheus
url: http://vmsingle-pulsar-victoria-metrics-k8s-stack:8429
isDefault: true
Key Features:
πΉ Persistent Dashboards: Survives pod restarts
πΉ LoadBalancer Service: Direct external access
πΉ Auto-discovery: Automatically scrapes Pulsar pods
πΉ Pre-configured: Works out-of-the-box
π§ Setting Up Flink Metrics
Step 1: Run the setup-flink-metrics.sh Script
This script does the heavy lifting:
cd realtime-platform-1million-events/flink-load
./setup-flink-metrics.sh
What it does (7 steps):
Step 1: Create Flink Configuration
# Applies flink-config-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: flink-benchmark
data:
flink-conf.yaml: |
metrics.reporters: prometheus
metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prometheus.port: 9249-9259
metrics.system-resource: "true"
metrics.system-resource-probing-interval: "5000"
Step 2: Setup Prometheus Integration
# Creates VMPodScrape for Victoria Metrics
apiVersion: operator.victoriametrics.com/v1beta1
kind: VMPodScrape
metadata:
name: flink-metrics
namespace: pulsar # β Created in Pulsar namespace!
spec:
selector:
matchLabels:
app: iot-flink-job
namespaceSelector:
matchNames:
- flink-benchmark # β Scrapes from Flink namespace
podMetricsEndpoints:
- port: metrics
path: /metrics
interval: 15s # Scrape every 15 seconds
Why VMPodScrape in Pulsar namespace?
- VMAgent runs in the Pulsar namespace
- It needs permission to scrape other namespaces
- Cross-namespace scraping via
namespaceSelector
Step 3-4: Patch Flink Deployments
# Downloads Prometheus reporter JAR via initContainer
initContainers:
- name: download-prometheus-jar
image: curlimages/curl:latest
command:
- sh
- -c
- |
curl -L https://repo1.maven.org/maven2/org/apache/flink/flink-metrics-prometheus/1.18.0/flink-metrics-prometheus-1.18.0.jar \
-o /flink-prometheus/flink-metrics-prometheus-1.18.0.jar
volumeMounts:
- name: flink-prometheus-jar
mountPath: /flink-prometheus
# Mounts JAR in Flink lib directory
containers:
- name: flink-main-container
volumeMounts:
- name: flink-prometheus-jar
mountPath: /opt/flink/lib/flink-metrics-prometheus-1.18.0.jar
subPath: flink-metrics-prometheus-1.18.0.jar
ports:
- containerPort: 9249
name: metrics
Why download at runtime?
- No need to rebuild Flink Docker image
- Easy to update JAR version
- Works with official Flink images
Step 4.5: Install ClickHouse Plugin
# Installs grafana-clickhouse-datasource plugin
kubectl exec -n pulsar <grafana-pod> -- \
grafana-cli plugins install grafana-clickhouse-datasource
# Restarts Grafana to load plugin
kubectl rollout restart deployment/pulsar-grafana -n pulsar
Why needed?
- ClickHouse uses native protocol, not Prometheus
- Plugin enables SQL queries from Grafana
- Required for ClickHouse dashboard
Step 5-6: Verify Setup
# Tests metrics endpoint
kubectl exec -n flink-benchmark <jobmanager-pod> -- \
curl localhost:9249/metrics | grep flink_
# Should show 200+ Flink metrics
Step 7: Restart VMAgent
# Reloads scrape configuration
kubectl rollout restart deployment/vmagent-pulsar-victoria-metrics-k8s-stack -n pulsar
Result: Flink metrics now flowing to VictoriaMetrics! β
Step 2: Import Dashboards
Now import the pre-built dashboards:
cd ../../monitoring/grafana-dashboards
./import-dashboards.sh
What it does:
- Port-forwards to Grafana:
kubectl port-forward -n pulsar svc/pulsar-grafana 3000:80 &
- Imports Flink Dashboard:
curl -X POST \
-H "Content-Type: application/json" \
-u admin:admin123 \
-d @flink-iot-pipeline-dashboard.json \
http://localhost:3000/api/dashboards/db
- Imports ClickHouse Dashboard:
curl -X POST \
-H "Content-Type: application/json" \
-u admin:admin123 \
-d @clickhouse-iot-data-dashboard.json \
http://localhost:3000/api/dashboards/db
Result: Three dashboards available in Grafana! π
Step 3: Access Your Unified Dashboard
# Port-forward to Grafana
kubectl port-forward -n pulsar svc/pulsar-grafana 3000:80
# Open http://localhost:3000
# Login: admin / admin123
Dashboard Overview:
- Pulsar Overview (default landing page)
- Flink IoT Pipeline Metrics (streaming processing)
- ClickHouse IoT Data Metrics (analytical queries)
π Real-Time Monitoring at Scale
Pulsar Dashboard - Message Ingestion
Key Metrics:
- Message Rate: Real-time ingestion rate (target: 1M msg/sec)
- Consumer Lag: Backlog across all topics
- Storage: Bookie disk usage and write latency
- Throughput: Bytes in/out per second
Critical Alerts:
- Consumer lag > 1M messages
- Bookie write latency > 2ms (p99)
- Disk usage > 85%
- Broker CPU > 90%
Flink Dashboard - Stream Processing
Key Metrics:
- Records Processing: Input/output rates with watermarks
- Checkpointing: Duration and success rate
- Backpressure: Task-level processing delays
- Resource Usage: CPU, memory, network per TaskManager
Sample Queries:
# Flink processing rate
rate(flink_taskmanager_job_task_numRecordsIn[1m])
# Checkpoint duration
flink_jobmanager_job_lastCheckpointDuration
# Backpressure time
flink_taskmanager_job_task_backPressureTimeMsPerSecond
ClickHouse Dashboard - Analytical Performance
Key Metrics:
- Query Performance: Latency percentiles (p50, p95, p99)
- Insert Rate: Rows inserted per second
- Storage: Table sizes and compression ratios
- Memory Usage: Query memory consumption
Custom SQL Panels:
-- Real-time insert rate
SELECT
toStartOfMinute(time) as minute,
COUNT(*) as inserts_per_minute
FROM benchmark.sensors_local
WHERE time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute
-- Top devices by message volume
SELECT
device_id,
COUNT(*) as message_count
FROM benchmark.sensors_local
WHERE time >= now() - INTERVAL 1 DAY
GROUP BY device_id
ORDER BY message_count DESC
LIMIT 10
ποΈ Advanced Dashboard Features
1. Cross-Component Correlation
Create panels that show end-to-end pipeline health:
# End-to-end latency calculation
(flink_taskmanager_job_task_currentProcessingTime - flink_taskmanager_job_task_currentInputWatermark) / 1000
2. Capacity Planning Views
Track resource utilization trends:
# Storage growth rate (bytes per hour)
rate(pulsar_storage_size[1h]) * 3600
# Memory utilization trend
avg_over_time(flink_taskmanager_Status_JVM_Memory_Heap_Used[24h])
3. SLA Monitoring
Define and track Service Level Objectives:
Pulsar SLOs:
- Message delivery: 99.9% success rate
- End-to-end latency: <100ms (p95)
- Availability: 99.95% uptime
Flink SLOs:
- Processing latency: <5 seconds (p99)
- Checkpoint success: >99%
- Job availability: 99.9% uptime
ClickHouse SLOs:
- Query latency: <200ms (p95)
- Insert success: 99.99%
- Data freshness: <60 seconds
π― Production Best Practices
1. Enable Persistent Storage
grafana:
persistence:
enabled: true
size: 10Gi
storageClassName: "gp3"
Why?
- Dashboard configurations persist across pod restarts
- Datasources don't need re-configuration
- Custom dashboards aren't lost
2. Organize Dashboards by Tags
When importing, add tags:
{
"dashboard": {
"tags": ["pulsar", "messaging", "production"],
...
}
}
Benefits:
- Easy filtering
- Logical grouping
- Better navigation
3. Set Up Alerts
# Example: Alert on high backpressure
- alert: FlinkHighBackpressure
expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 500
for: 5m
annotations:
summary: "Flink task {{ $labels.task_name }} has high backpressure"
Common alerts:
- Pulsar consumer lag > 1M messages
- Flink checkpoint failures
- ClickHouse query latency > 1s
- Broker CPU > 90%
- Disk usage > 85%
4. Create Custom Views
# Row for each component
Row 1: Pulsar (Message ingestion)
Row 2: Flink (Stream processing)
Row 3: ClickHouse (Data storage)
Row 4: Infrastructure (CPU, memory, disk)
Example panel:
{
"title": "End-to-End Latency",
"targets": [
{
"expr": "flink_taskmanager_job_latency_source_id_operator_id{quantile=\"0.99\"}",
"legendFormat": "Flink p99 latency"
}
]
}
5. Export and Version Control
# Export all dashboards
for dashboard in flink clickhouse pulsar; do
curl -u admin:admin123 \
"http://localhost:3000/api/dashboards/uid/$dashboard" | \
jq '.dashboard' > ${dashboard}-dashboard.json
done
# Commit to git
git add grafana-dashboards/*.json
git commit -m "Update Grafana dashboards"
π The Complete Monitoring Workflow
Initial Setup (One Time)
# 1. Deploy Pulsar with VictoriaMetrics stack
cd pulsar-load
./deploy.sh
# β Grafana and VictoriaMetrics installed automatically
# 2. Setup Flink metrics integration
cd ../flink-load
./setup-flink-metrics.sh
# β Flink pods now expose metrics
# β VMAgent configured to scrape Flink
# 3. Import custom dashboards
cd ../../monitoring/grafana-dashboards
./import-dashboards.sh
# β Flink and ClickHouse dashboards imported
Daily Operations
# Access Grafana
kubectl port-forward -n pulsar svc/pulsar-grafana 3000:80
# Open http://localhost:3000
# View dashboards:
# 1. Pulsar Overview (default)
# 2. Dashboards β Flink IoT Pipeline Metrics
# 3. Dashboards β ClickHouse IoT Data Metrics
Monitoring at Scale (1M msg/sec)
What to watch:
Pulsar:
- Message rate: Should be ~1M msg/sec consistently
- Consumer lag: Should be near 0
- Bookie write latency: <2ms p99
- Storage growth: ~300 MB/sec
Flink:
- Records in: ~1M msg/sec
- Records out: ~17K records/min (after aggregation)
- Checkpoint duration: <10 seconds
- Backpressure: LOW
- CPU: 75-85% utilization
ClickHouse:
- Insert rate: ~289 inserts/sec (17,333/60)
- Query latency: <200ms for aggregations
- Table size: Growing at ~50 MB/sec
- Compression ratio: 10-15x
π Performance Impact of Monitoring
Resource overhead:
| Component | CPU | Memory | Storage |
|---|---|---|---|
| VMAgent | 0.1 vCPU | 200 MB | - |
| VMSingle | 0.5 vCPU | 2 GB | 50 GB/month |
| Grafana | 0.1 vCPU | 400 MB | 10 GB |
| Total | 0.7 vCPU | 2.6 GB | 60 GB/month |
Cost impact: ~$50/month (minimal!)
Benefits:
- Real-time visibility
- Faster troubleshooting
- Capacity planning
- Performance optimization
- SLA monitoring
π― Advanced: Custom Metrics
Add Custom Flink Metrics
// In your Flink job
public class CustomMetricsMapper extends RichMapFunction<Event, Event> {
private transient Counter eventCounter;
private transient Meter eventRate;
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext()
.getMetricGroup()
.counter("custom_events_processed");
eventRate = getRuntimeContext()
.getMetricGroup()
.meter("custom_events_per_second", new MeterView(60));
}
@Override
public Event map(Event event) {
eventCounter.inc();
eventRate.markEvent();
return event;
}
}
View in Grafana:
rate(flink_taskmanager_job_task_operator_custom_events_processed[1m])
Add Custom ClickHouse Queries
{
"targets": [
{
"datasource": "ClickHouse",
"rawSql": "SELECT toStartOfMinute(time) as t, COUNT(*) as count FROM benchmark.sensors_local WHERE time >= now() - INTERVAL 1 HOUR GROUP BY t ORDER BY t",
"format": "time_series"
}
]
}
π Conclusion
You now have a unified monitoring stack that provides:
β
Single Grafana instance monitoring entire pipeline
β
VictoriaMetrics for efficient metric storage
β
3 pre-built dashboards (Pulsar, Flink, ClickHouse)
β
Automated setup with shell scripts
β
Cross-namespace monitoring (Pulsar β Flink)
β
Real-time visibility at 1M msg/sec scale
β
Production-ready alerting and SLO tracking
The beauty of this setup:
- Deploy once: Monitoring comes with Pulsar
-
Add Flink: One script (
setup-flink-metrics.sh) -
Import dashboards: One script (
import-dashboards.sh) - Access everything: Single Grafana instance
No separate Prometheus installations, no complex federation, no metric duplication. Just a clean, unified monitoring solution! π
π Resources
- GitHub Repository: RealtimeDataPlatform/monitoring
- VictoriaMetrics Docs: docs.victoriametrics.com
- Grafana Docs: grafana.com/docs
- Flink Metrics: nightlies.apache.org/flink/flink-docs-stable/docs/ops/metrics/
How do you monitor your streaming pipelines? Share your setup in the comments! π
Follow me for more posts on observability, real-time systems, and production DevOps!
Tags: #monitoring #grafana #victoriametrics #flink #pulsar #clickhouse #kubernetes #observability
π Quick Reference
Port-Forward Commands
# Grafana (all dashboards)
kubectl port-forward -n pulsar svc/pulsar-grafana 3000:80
# VictoriaMetrics (raw metrics)
kubectl port-forward -n pulsar svc/vmsingle-pulsar-victoria-metrics-k8s-stack 8429:8429
# Flink UI (job details)
kubectl port-forward -n flink-benchmark svc/flink-jobmanager-rest 8081:8081
# Prometheus-compatible API
kubectl port-forward -n pulsar svc/vmsingle-pulsar-victoria-metrics-k8s-stack 9090:8429
Dashboard URLs
Pulsar: http://localhost:3000 (default landing page)
Flink: http://localhost:3000/d/flink-iot-pipeline
ClickHouse: http://localhost:3000/d/clickhouse-iot-metrics
Useful Queries
# Total message rate across all Pulsar topics
sum(rate(pulsar_in_messages_total[1m]))
# Flink processing lag
flink_taskmanager_job_task_currentInputWatermark - flink_taskmanager_job_task_currentOutputWatermark
# ClickHouse disk usage
clickhouse_metric_DiskDataBytes
Top comments (0)