- [What to measure: the three pillars (metrics, logs, traces)]
- [How to instrument Kafka, Flink, and your clients so metrics actually help]
- [SLOs, alerts, and the escalation playbook that prevents page storms]
- [Tracing and lineage: bridging asynchronous hops for real-time debugging]
- [Automated reconciliation and continuous validation to close the data integrity loop]
- [Practical runbooks and code snippets you can apply in 60 minutes]
The hard truth: streaming systems look healthy until they quietly stop being correct. Small shifts—hidden consumer lag, slow checkpoints, or a single partition with silent IO errors—turn real-time pipelines into unreliable, expensive batch replays.
The symptoms you see—spikes in end-to-end latency, a subset of events not appearing in downstream tables, noisy dashboards that disagree with the reporting database—are not caused by one component. They’re caused by weak instrumentation and no reconciliation loop: metrics that measure CPU but not correctness, logs that lack trace ids, and alerting that pages on symptoms rather than root causes.
What to measure: the three pillars (metrics, logs, traces)
Measure three signals in concert: metrics for trends and SLAs, logs for context and forensics, and traces for causal flow between asynchronous hops.
- Metrics (what matters in streaming)
- Broker health: Under‑replicated partitions, Offline partitions, replication lag and controller status. These come from Kafka’s JMX MBeans and are the first line of defense for cluster-level issues.
- Broker throughput/latency:
MessagesInPerSec,BytesInPerSec,BytesOutPerSec, request/response latencies. Track both rate and cumulative counters because spike patterns differ by percentile. - Consumer/client health: consumer group lag per partition,
records-consumed-rate, commit latency and commit success/failure counts. Lag is the single most actionable indicator that your pipeline is not keeping up. - Flink job health: checkpoint success/failure counts, last checkpoint duration, checkpoint alignment time, state size, task backpressure indicators, and operator-level record in/out rates. These Flink metrics expose the runtime health and are critical for stateful correctness.
- End-to-end freshness: a sampled latency histogram from ingestion timestamp to final sink write (p50/p95/p99/p999). Capture event-time and processing-time latencies; percentiles reveal tail behavior that averages hide.
- Logs (what to capture)
- Structured JSON logs with
trace_id,message_key,topic,partition,offset,ingest_ts, andapp_instance. This lets you join logs to traces and to reconciliation outputs. - Operator and connector stack traces combined with the
jobIdand taskattempt identifiers from Flink for quick lookup in the UI.
- Structured JSON logs with
- Traces (what to propagate)
- Propagate W3C
traceparent/tracestateacross producers, Kafka headers, Flink tasks, connectors, and sinks so you can reconstruct asynchronous executions end-to-end. Use OpenTelemetry’s messaging semantic conventions for span naming and attributes.
- Propagate W3C
Key metric groups (quick reference)
Area Why it matters Example metric / source Kafka broker health Prevent data loss & leader churn UnderReplicatedPartitions(JMX).Consumer lag Shows processing backlog and correctness risk exporter: kafka_consumergroup_lag{group,topic,partition}.Flink checkpointing Determines snapshot consistency & recovery lastCheckpointDuration,checkpointFailedCount.E2E latency Business SLA for freshness histogram of (sink_ts - ingest_ts) or traced spans.
Citations: Kafka JMX docs and mapping: . Prometheus JMX exporter provides the path to make JMX metrics available to Prometheus: . Flink Prometheus integration and metrics explanation: .
How to instrument Kafka, Flink, and your clients so metrics actually help
The instrumentation job is threefold: expose, reduce cardinality, and correlate.
1) Expose component metrics
- Kafka brokers: run the Prometheus JMX exporter as a Java agent on each broker (or sidecar) to convert MBeans into Prometheus metrics. That surfaces
kafka.server:*and controller MBeans for scraping. Example JVM arg (shell):
export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=9404:/etc/jmx_exporter/kafka.yml"
Prometheus scrapes the exporter endpoint.
- Flink: use the built-in
PrometheusReporter(drop theflink-metrics-prometheusjar intoflink/liband configureflink-conf.yaml) so job managers and task managers expose metrics for Prometheus to scrape. Example config:
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Flink exposes checkpoint metrics, operator-level rates, and backpressure gauges.
2) Instrument clients (producers/consumers)
- JVM clients: bind Kafka client metrics into your application registry via Micrometer’s
KafkaClientMetrics. This yieldskafka.*metric names that integrate with your existingMeterRegistryand Prometheus push/scrape setup. Example Java:
Producer<String,String> producer = new KafkaProducer<>(props);
KafkaClientMetrics metrics = new KafkaClientMetrics(producer);
metrics.bindTo(meterRegistry);
Micrometer provides a consistent tags model so you can group by client id, application, and environment.
3) Correlate metrics, logs, and traces
- Distributed tracing: instrument Kafka producers/consumers with OpenTelemetry. Use either the Java agent or the
opentelemetry-kafka-clientsinstrumentation; inject trace context into message headers and extract it downstream so spans form a coherent trace across asynchronous hops. Example producer-side injection (Java + OpenTelemetry):
TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
Span span = tracer.spanBuilder("produce").startSpan();
try (Scope s = span.makeCurrent()) {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, value);
propagator.inject(Context.current(), record.headers(),
(headers, k, v) -> headers.add(k, v.getBytes(StandardCharsets.UTF_8)));
producer.send(record);
} finally {
span.end();
}
OpenTelemetry documents Kafka client instrumentation and recommends using messaging semantic conventions for attributes. [19search0]
4) Practical telemetry hygiene rules
- Choose low‑cardinality labels for metrics (service, topic-template, environment), and avoid raw ids (user id, order id) in metric labels.
- Histogram buckets: use well-chosen latency buckets for p50/p95/p99; precompute percentile-friendly buckets server-side where possible.
- Sampling: trace a fraction of messages (for high-QPS topics) but ensure synthetic transactions / complete traces for critical flows.
SLOs, alerts, and the escalation playbook that prevents page storms
SLOs guide alerting. Define SLOs that reflect user-facing freshness and correctness rather than node-level CPU.
-
Starter SLOs (examples you can adapt)
- Freshness (latency): 99% of events have end-to-end latency < 500 ms measured on a rolling 30-day window.
- Completeness (reconciliation): 99.99% of produced messages appear in the sink within 5 minutes of production for steady-state traffic.
- Availability (pipeline): Job/process availability >= 99.9% per month (no prolonged checkpointing failures). Use error budgets to balance releases vs reliability.
-
Alerting strategy aligned to SLOs
- Alert at symptom-level (page) only when SLO breach or imminent burn-rate is high. Use a small set of actionable page alerts and promote less-critical signals to tickets or dashboards. Google SRE’s error budget model applies directly here: alerts consume the budget; paging should be reserved for budget burn or severe degradations.
- Use Alertmanager routing for severity and grouping: group alerts by
service,pipeline,clusterto avoid storms. Use inhibition to suppress lower-priority noise when critical cluster-level alerts are firing.
Example Prometheus alert rules (conceptual)
groups:
- name: streaming.rules
rules:
- alert: KafkaUnderreplicatedPartitions
expr: sum(kafka_server_replica_underreplicatedpartitions) > 0
for: 2m
labels:
severity: critical
annotations:
summary: "Broker has under-replicated partitions"
- alert: HighConsumerLag
expr: sum by (group)(kafka_consumergroup_lag{group=~"orders-.*"}) > 100000
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer group {{ $labels.group }} lag above threshold"
Label names differ by exporter—adapt expressions to your exporter’s metric names.
- Escalation playbook (concise)
- Page on-call for a critical alert (HighConsumerLag, UnderReplicatedPartitions, CheckpointingStuck).
- On-call triage steps (ordered checklist):
- Confirm alert & scope (which topics, partitions, job IDs).
- Check Kafka broker metrics (
UnderReplicatedPartitions, network errors) and controller logs. - Check Flink UI for failed checkpoints, backpressure, or task failures.
- If consumer lag: query
kafka-consumer-groups.sh --describeto view partition-level lag and reassign or scale consumers as required. - If checkpointing is failing: take savepoint and restart job if necessary (see Flink savepoint docs). [20search0]
- Update PagerDuty/incident channel with clear status, mitigation, and next steps.
Callout: Configure a low-volume synthetic transaction for every critical pipeline to act as a living SLO probe—one that produces, consumes, and asserts correctness end-to-end at a known cadence (e.g., every 20s). Synthetic probes measure availability as clients see it, not only system internals.
Tracing and lineage: bridging asynchronous hops for real-time debugging
Tracing real-time pipelines differs from request/response tracing because messages are decoupled and asynchronous. Use tracing to reconstruct causal chains and to track data lineage.
- Propagate context across Kafka
- Write
traceparentand key metadata into Kafka message headers when producing. Extract them on consumption and start a child span (or an extracted parent) in the consumer or Flink operator. The W3C trace context ensures interop across vendors.
- Write
- Choose span model carefully
- Producer span:
send topicX - Broker span (optional if instrumented):
kafka.broker:write(often provided by instrumentation) - Consumer span:
process topicX— uselinksto associate the consumer work with the original producer span if parent-child semantics are not straightforward due to asynchronous decoupling. OpenTelemetry’s semantic conventions document covers messaging spans and attributes to standardize instrumentation. [19search2]
- Producer span:
- Data lineage metadata
- Add headers/attributes for
schema_id(schema registry),source_system,ingest_ts,offset, andpartition. Persist lineage metadata into a lightweight lineage store (or data catalog) keyed by trace id so you can show a trace → data change → sink row mapping during post-mortem.
- Add headers/attributes for
- Collector & storage
- Use an OpenTelemetry Collector and backend (Jaeger, Tempo, or commercial APM) to aggregate traces; enable a Kafka receiver in the collector if you want to stream tracing records through Kafka itself. This lets you query traces that cross Kafka and Flink boundaries.
Example Flink operator extraction (pseudo-Java):
// inside a map/flatMap that has access to Kafka headers
Context extracted = propagator.extract(Context.current(), record.headers(), extractor);
Span span = tracer.spanBuilder("flink.process").setParent(extracted).startSpan();
try (Scope s = span.makeCurrent()) {
// process record
} finally {
span.end();
}
Tracing provides the exact path and latency contributions (producer → broker → consumer → sink) so you can triage whether the problem is a broker commit, network, consumer processing, or sink write.
Automated reconciliation and continuous validation to close the data integrity loop
Metrics and traces tell when something is wrong; reconciliation tells what data is wrong.
-
Two reconciliation patterns
- Offset and count reconciliation (fast, lightweight): Periodically compare message counts or per-key aggregates over identical time windows between source (Kafka offsets or topic aggregates) and the sink (warehouse table partitions). Surface mismatch ratios and sample offending keys for inspection.
- Record-level reconciliation (heavy but exact): For critical datasets, compute a deterministic checksum (e.g., hash of canonical serialized record) in both source and sink and diff the hashes on windows. Use partition-aware jobs to parallelize reconciliation.
-
Practical reconciliation workflow
- Schedule a reconciliation job every N minutes (window size tied to SLO; e.g., every 5 minutes for a 5-minute freshness SLO).
- For each topic-window: record
produced_count,produced_checksum, and highest offsets per partition; compare tosink_countandsink_checksum. - Emit reconciliation metrics (e.g.,
reconciliation_mismatch_ratio,reconciliation_latency_seconds) so Alertmanager can page on persistent mismatches. - If mismatch crosses threshold, trigger a forensics run and mark affected keys for reprocessing via savepoint + targeted replay or a backfill job.
-
Continuous validation frameworks
- Use Great Expectations style checks for minibatches or checkpointed windows: run expectation suites per window to validate schema, null rates, distribution shifts, and aggregate constraints. Great Expectations’ checkpoint model is useful as a standardized runner for validations and alert actions.
- Combine small in‑pipeline checks (lightweight asserts, schema rejection) with offline windowed validations that are strict and produce incidents.
Example reconciliation metric (pseudo-query)
-- produced counts per 5-minute window (from a compact sink of topic events)
SELECT window_start, COUNT(*) AS produced
FROM kafka_topics.orders
WHERE ingest_ts >= now() - interval '1 day'
GROUP BY window_start;
-- compare to sink counts and compute mismatch percent
- Automate remediation (playbooks)
- On mismatch: tag the affected time-window and partition, capture savepoint, run targeted replay from earliest affected offset (or a backup store like S3), and verify reconciliation result before closing incident.
Practical runbooks and code snippets you can apply in 60 minutes
A compact checklist and a few runnable examples to get a baseline.
-
Quick checklist to establish core observability (60 min)
- Add Prometheus JMX exporter to Kafka brokers and confirm
/metricsis reachable. - Drop
flink-metrics-prometheusjar intoflink/liband enablePrometheusReporterinflink-conf.yaml. Confirmjobmanagerandtaskmanagermetrics endpoints. - Bind Kafka client metrics via Micrometer or enable the OpenTelemetry Java agent for Kafka clients to get traces.
- Create a
synthetic-slatopic and consumer/producer that perform a write-read-assert every 20s; measure end-to-end latency and error counts as an SLO probe.
- Add Prometheus JMX exporter to Kafka brokers and confirm
Immediate Prometheus alert examples (copy-edit for exporter names)
groups:
- name: stream-critical
rules:
- alert: FlinkCheckpointStuck
expr: increase(flink_job_checkpoint_failed_count[10m]) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Flink job {{ $labels.job }} has failing checkpoints"
- alert: ConsumerLagHigh
expr: sum(kafka_consumergroup_lag{group="orders-consumers"}) by (group) > 200000
for: 10m
labels:
severity: critical
-
Rapid triage runbook for "High end-to-end latency" (ordered)
- Check end-to-end latency metric and percentile graphs (p95/p99).
- Check producer-side produce latency and broker request latency (
RequestHandlerAvgIdlePercentto find thread starvation). - Check Kafka broker disk IO and replication metrics for hotspots.
- Check Flink operator backpressure and CPU/memory on TaskManagers; inspect checkpoint durations.
- If backlog found: scale consumers or task parallelism, apply backpressure mitigation (increase task slots or accelerate sink throughput), and consider temporary rate limiting upstream.
-
Quick command recipes
- Describe consumer group lag:
bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --describe --group orders-consumers
- Trigger a Flink savepoint:
bin/flink savepoint <jobId> hdfs:///flink/savepoints
- Inspect Flink checkpoints and job metrics via the Flink Web UI (JobManager endpoint). [20search0]
Sources
Apache Kafka — Monitoring - Kafka’s official monitoring guidance and the JMX MBean names (e.g., BrokerTopicMetrics, replication/partition metrics) used to derive the key broker and client metrics.
Prometheus JMX Exporter (jmx_exporter) - The Java agent and exporter used to expose Java MBeans (used for Kafka brokers and many Java clients) as Prometheus metrics.
Flink and Prometheus: Cloud-native monitoring of streaming applications - Flink project blog explaining the PrometheusReporter integration and practical setup patterns.
Apache Flink — Metrics - Flink official metrics documentation covering checkpoint metrics, operator/task metrics, and recommended metrics to observe.
TwoPhaseCommitSinkFunction (Flink API) - Flink’s base class documentation used to implement two‑phase commit sinks (the pattern behind end‑to‑end exactly‑once for sinks like Kafka).
KafkaProducer (Apache Kafka Java client) - Documentation describing idempotent and transactional producers and the transactional.id semantics used for exactly‑once behavior.
W3C Trace Context Specification - The standard for traceparent/tracestate headers used to propagate trace context cross-process and across messaging boundaries.
Instrumenting Apache Kafka clients with OpenTelemetry (OpenTelemetry blog) - Operational guidance and examples for Kafka client instrumentation with OpenTelemetry and propagation patterns.
Micrometer — Apache Kafka Metrics (reference) - Shows KafkaClientMetrics binder and practical bindings for producer/consumer metrics into Micrometer registries.
Prometheus — Alertmanager - Alertmanager concepts for grouping, inhibition, and routing alerts to avoid notification storms and to implement escalation policies.
Great Expectations — GitHub (project) - The open-source framework for data expectations, checkpointing and validation that teams commonly use for continuous validation (checkpoints and actionable validation results).
OpenTelemetry Collector Kafka receiver (opentelemetry-collector-contrib) - Collector receiver that can extract Kafka message headers and include them in telemetry, useful for pipeline-level collection and header extraction.
A clear, correlated telemetry plane — Prometheus metrics from Kafka and Flink, structured logs keyed by trace_id, and sampled OpenTelemetry traces that ride in Kafka headers — turns silent failures into fast remediation. Implement the short checklist above, bake SLOs into your alerting, and automate reconciliation windows; you will catch correctness issues when they are cheap to fix and keep your pipelines truly real-time.
Top comments (0)