VictoriaMetrics' stream aggregation is a powerful feature for reducing metric cardinality in real time. But when you push it to millions of time series across a distributed fleet, the native implementation starts to show cracks.
This post walks through what I found analyzing the stream aggregation source code, the real-world problems that emerged at scale, and the distributed gateway we built to solve them.
Community VM Stream Aggregation — Capability Analysis
Stream aggregation was integrated into vmagent starting from version 1.86 (GitHub issue #3460). Let's look at what it actually does under the hood.
Core Computation: The pushSample Function
The heart of stream aggregation lives in the pushSample function. Here's the simplified logic:
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
again:
v, ok := as.m.Load(outputKey)
if !ok {
v = &totalStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
v = vNew
}
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
goto again
}
}
The core idea: each incoming sample's value is compared against the last seen value for that time series. The delta is accumulated into a running total. This works well for counters that monotonically increase, but there's a critical detail — the time window logic is simple periodic checking with no sophisticated handling for delayed or out-of-order arrivals.
What Stream Aggregation Looks Like in Practice
In theory, stream aggregation should cleanly reduce high-cardinality metrics down to manageable summaries. In practice, the picture is messier:
- Ideal model: every sample arrives on time, windows align perfectly, aggregation is lossless
- Reality: samples arrive late, retries flood old data, gaps appear from network or service issues
Common Issues with Native Stream Aggregation
The Collection Gap Problem
Collection gaps are inevitable. Network blips, service restarts, GC pauses — any of these can cause a gap in metric collection. For high-precision stream aggregation, gaps create a specific failure mode:
When a counter's last tracked value is 1000, and a gap causes the next received value to be 5000, the delta is 4000 — which may span an unknown number of actual increments. If the gap occurred within a single aggregation window, the inflated value corrupts that window's result. If the gap crosses window boundaries, you get compounding errors in downstream calculations.
Distributed Computing Challenges
Stream aggregation doesn't persist historical data, so it's fast — but even the fastest service has single-node limits. When you need to scale horizontally, a cascade of new problems appears:
Is vmagent's built-in collection viable at scale?
In testing, enabling vmagent shard + replica collection with real-time stream aggregation caused significant resource spikes. At very large scales, collection gaps became more frequent, which amplified the calculation errors from gaps rather than mitigating them.
Which compute node should each sample be assigned to?
Without a consistent routing strategy, the same metric dimension gets split across nodes, producing partial results that can't be safely combined.
How do you handle the same dimension set being computed by multiple nodes with different values?
When multiple nodes produce results for the same dimension within the same time window, VictoriaMetrics triggers its out-of-order handling logic — which discards later values. You lose data silently.
How do you balance resources in distributed computation?
Uneven distribution means some nodes are overloaded while others sit idle.
What about the new dimensions introduced by routing?
If you insert a task ID to differentiate compute nodes, you've just added a new dimension that grows with every node you add — defeating part of the purpose of aggregation.
Design and Implementation: A Distributed Stream Aggregation Gateway
After analyzing these problems, it became clear that a frontend module was needed to address them at the entry point. Since vmgateway is an enterprise component, we built our own: vm-receive-route, a distributed stream aggregation gateway.
Key Insight from Source Code
Two aspects of the native implementation are particularly relevant:
Time window range:
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
The window has a 50% grace period (intervalSecs >> 1) beyond the configured interval. This is the only protection against late-arriving data — and it's quite generous, which means stale data can still influence results within that extended window.
Calculation logic:
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
The aggregation only checks if a previous value exists and is smaller — it doesn't handle the case where a late-arriving sample with a lower value (after a restart, for example) creates a negative delta that's simply ignored. There's no deduplication, no gap detection, no intelligent merging.
What the Gateway Solves
1. Asynchronous Processing
Most remote write adapters (like prometheus-kafka-adapter) do synchronous forwarding — they wait for the downstream (Kafka, etc.) to acknowledge before accepting the next batch. Stream aggregation has window constraints: if the write pipeline blocks and samples arrive late, they miss their window and calculations drift.
The gateway decouples ingestion from forwarding using an internal buffer. The remote write endpoint returns immediately, and samples are forwarded asynchronously to the stream aggregation backend. This prevents back-pressure from creating cascading delays.
2. Time Window Filtering
Since stream aggregation already computes deltas between successive values, there's no need for complex out-of-order handling at this layer. The gateway simply cooperates with the aggregation window:
- Samples arriving within the window → forward normally
- Samples arriving outside the window → discard
This solves two problems at once:
- Prometheus retries that send large backlogs of old samples no longer corrupt real-time results
- The resource overhead of processing those retries is eliminated at the gateway level — the aggregation backend never sees them
3. Dimension Control
The stream aggregation component inserts a node ID into each aggregated time series to distinguish labels across compute nodes. But as nodes scale horizontally, the cardinality of that label scales too. You need a way to:
- Control dimension growth from node identities
- Route time series by dimension to the correct node
We designed a dual hashmod scheduling algorithm:
- The gateway assigns a hash-based task ID to each time series based on its stable dimensions (not the node identity)
- The same series always routes to the same compute node, regardless of which gateway instance processed it
- The task ID dimension is bounded by the number of unique dimension combinations, not the number of gateway nodes
By moving the task ID labeling to the gateway layer, we eliminated the unbounded dimension growth that horizontal scaling would otherwise cause.
4. Backend Service Migration for Failures
When a compute node fails, its in-memory aggregation state is lost. The gateway detects failures via health checks and reroutes traffic to healthy nodes. Since the dual hashmod ensures consistent routing, the remaining nodes can immediately pick up the work, though there will be a brief period of incomplete aggregation until the state rebuilds.
Record Rule Dimension Task Generator
Stream aggregation is great for reducing cardinality of single metrics. But real-world monitoring scenarios require combining multiple metrics with functions — which is where Prometheus Record Rule comes in.
The Problem with Record Rules at Scale
Consider an HTTP request metric with a req_path dimension:
Before stream aggregation:
a_http_req_total{zone="bj", src_svr="192.168.1.2", src_port="30021", dis_svr="192.168.2.3", code="202", req_path="/api/foo?abc=xyz"}
a_http_req_total{zone="bj", src_svr="192.168.1.2", src_port="30023", dis_svr="192.168.2.3", code="202", req_path="/api/bar?abc=def"}
a_http_req_total{zone="bj", src_svr="192.168.1.2", src_port="10021", dis_svr="192.168.2.3", code="202", req_path="/api/baz?abc=ghi"}
...
After stream aggregation (dropping req_path and src_port):
agg_a_http_req_total{zone="bj", src_svr="192.168.1.2", dis_svr="192.168.2.3", code="202"}
agg_a_http_req_total{zone="bj", src_svr="192.168.1.2", dis_svr="192.168.2.3", code="500"}
agg_a_http_req_total{zone="bj", src_svr="192.168.1.2", dis_svr="192.168.2.3", code="400"}
But what you actually want to display is the success rate per target:
sum by (dis_svr) (
rate(a_http_req_total{code=~"2.*"}[5m])
)
/
sum by (dis_svr) (
rate(a_http_req_total{}[5m])
)
You can use Record Rule to precompute this:
groups:
- name: a_http_req_total:sum:rate:5m
rules:
- expr: sum by (src_svr, dis_svr, code) (rate(a_http_req_total{}[5m]))
record: a_http_req_total:sum:rate:5m
The problem? Record Rule loads all dimensions of the metric into memory. When dimension counts reach critical thresholds, it triggers OOM. Even below that threshold, higher cardinality means slower computation.
In production, istio_requests_total QPS could be delayed by 20 minutes at high dimension counts. After applying stream aggregation to reduce from tens of millions of time series down to tens of thousands, the delay dropped to 1-2 minutes — better, but still far from real-time.
Dynamic Dimension-Split Record Rules
The issue is that Record Rule evaluates one query per group, loading everything into memory. But if you split the query by specific dimension values, you can process them concurrently.
The static approach after stream aggregation:
groups:
- name: agg_a_http_req_total:sum:rate:5m-2xx
rules:
- expr: sum by (src_svr, dis_svr, code) (rate(agg_a_http_req_total{code=~"2.*"}[5m]))
record: agg_a_http_req_total:sum:rate:5m
- name: agg_a_http_req_total:sum:rate:5m-4xx
rules:
- expr: sum by (src_svr, dis_svr, code) (rate(agg_a_http_req_total{code=~"4.*"}[5m]))
record: agg_a_http_req_total:sum:rate:5m
- name: agg_a_http_req_total:sum:rate:5m-5xx
rules:
- expr: sum by (src_svr, dis_svr, code) (rate(agg_a_http_req_total{code=~"5.*"}[5m]))
record: agg_a_http_req_total:sum:rate:5m
But the problem is that in production, dimension labels are dynamic. You can't hardcode splits for every dimension value. You need a label metadata management system that watches dimension combinations and dynamically generates split queries.
The ruler-handle-process Component
We built a small metadata watch and rule builder that automates this. Its configuration looks like:
recode_rules:
- interval: 5m
recode_to: istio_requests_total:sum:rate:5m
metric_name: istio_requests_total
aggr_type: sum
vector_type: rate
vector_range: 5m
group_by_and_filter:
- source_workload
- destination_workload
- cluster
- namespace
group_by:
- response_code
- namespace
- source_workload_namespace
- destination_workload_namespace
- destination_service_name
- cluster
- reporter
filter_by:
cluster: "k8s-hw-bj-xxxxxx"
with_out:
source_workload: "ingressgateway-workflows"
The component watches the actual dimension combinations under the metric name istio_requests_total and generates a set of Record Rule configurations — one per unique dimension combination. Combined with Prometheus's Rule component for concurrent evaluation, this reduced computation latency from minutes to seconds.
The generated rules look like:
groups:
- name: istio_requests_total:sum:rate:5m-7218756fe8a0bc327e818812cefb02f7
rules:
- expr: sum by (...) (rate(istio_requests_total{cluster="k8s-hw-bj-1-prod", destination_workload="skyaxe-778-flink", ...}[5m]))
record: istio_requests_total:sum:rate:5m
- name: istio_requests_total:sum:rate:5m-8e30244048f8d5519a6332f309578ed4
rules:
- expr: sum by (...) (rate(istio_requests_total{cluster="k8s-hw-bj-1-prod", destination_workload="t-bean-portal", ...}[5m]))
record: istio_requests_total:sum:rate:5m
Each unique dimension combination gets its own rule group, enabling true concurrent computation.
Architecture Combinations for Different Scales
Using community open-source components alongside our custom gateway and rule builder, we assembled a tiered architecture that handles everything from small deployments to massive fleets.
Small Scale: Tens of Thousands of Single Metrics
Minimal setup — vmagent with built-in stream aggregation. No gateway needed. The single-node limits aren't reached yet.
Medium Scale: Tens of Thousands of Multi-Metrics
Add Record Rules for computed metrics. Use the dimension-split approach to keep computation fast. Stream aggregation reduces cardinality before Record Rule evaluation.
Large Scale: Millions+ of Single Metrics
This is where the distributed gateway comes in:
- Multiple vmagent instances behind the gateway for horizontal ingestion
- Dual hashmod scheduling ensures consistent routing
- Time window filtering at the gateway prevents retries from polluting results
- Asynchronous forwarding prevents back-pressure from creating cascading delays
Large Scale: Scenario-Based Computational Aggregation for Millions of Metrics
The full stack:
- Stream aggregation — first pass cardinality reduction at ingestion
- Distributed gateway — routing, filtering, and load distribution
- Dynamic rule builder — watches dimension metadata and generates concurrent Record Rule groups
- Rule engine — evaluates split queries in parallel
Summary
VictoriaMetrics' stream aggregation is a solid foundation, but it was designed for single-node operation. When you need to scale horizontally, you encounter a series of interconnected problems — collection gaps, dimension explosion, record rule performance bottlenecks — that the native implementation doesn't address.
The solutions aren't particularly complex individually (async processing, time window filtering, hash-based routing, dimension-aware rule generation), but they need to work together as a coherent system. The gateway pattern — intercepting the remote write path before it reaches the aggregation layer — proved to be the right abstraction point for injecting these capabilities without modifying upstream code.
The ruler-handle-process component was the second key insight: rather than fighting Prometheus Record Rule's single-query-per-group limitation, we embraced it by dynamically splitting queries by dimension and running them concurrently. This turned a 20-minute computation into a seconds-level operation.
If you're running VictoriaMetrics at scale and hitting these issues, the patterns described here should be applicable regardless of your specific stack. The gateway approach is generic enough to work with any Prometheus-compatible remote write backend.
Originally published on my blog
Top comments (0)