DEV Community

Cover image for Streaming anomaly detection in Node.js with Iterflow
Gaurav Singh
Gaurav Singh

Posted on

Streaming anomaly detection in Node.js with Iterflow

If you need to flag latency spikes in a stream of request times, the usual approach is to collect everything, compute mean and std dev, then filter. That doesn't work well on continuous streams where you don't have all the data upfront.

Iterflow is a streaming statistics library for JS/TS that lets you do this incrementally. This post walks through three approaches to anomaly detection using it.

npm install @mathscapes/iterflow
Enter fullscreen mode Exit fullscreen mode

The data

Some HTTP response times. Mostly 40-50ms, with a couple of spikes mixed in:

const latencies = [
  45, 42, 48, 44, 46, 43, 47, 45, 44, 43,
  46, 41, 48, 45, 200,  // spike
  44, 43, 46, 42, 45, 47, 44, 43, 45, 46,
  42, 48, 44, 250,      // spike
  45, 43, 47, 44, 46
];
Enter fullscreen mode Exit fullscreen mode

The key thing: you want to flag values that are weird relative to what came before them. In a real stream you don't have the full dataset, so you can't just take the global mean.

Streaming z-score

Z-score = how many standard deviations from the mean. Iterflow's streamingZScore() computes this incrementally -- each value is scored against the mean and std dev of everything before it. The current value doesn't affect its own score.

import { iter } from '@mathscapes/iterflow';

const anomalies = iter(latencies)
  .streamingZScore()
  .enumerate()
  .filter(([_, z]) => Math.abs(z) > 3)
  .map(([i, z]) => ({ index: i, value: latencies[i], zScore: +z.toFixed(2) }))
  .toArray();

console.log(anomalies);
// [
//   { index: 14, value: 200, zScore: 8.41 },
//   { index: 28, value: 250, zScore: 9.12 }
// ]
Enter fullscreen mode Exit fullscreen mode

Breaking it down:

  1. streamingZScore() emits a z-score per element using Welford's algorithm internally. First two elements come out as NaN (need at least two prior observations for a meaningful std dev).
  2. enumerate() pairs each z-score with its index: [0, NaN], [1, NaN], [2, 3.0], ...
  3. filter keeps anything beyond 3 standard deviations.
  4. map reshapes it into something readable.
  5. toArray() kicks off the pipeline.

The z-score stage holds 3 numbers in memory (count, mean, M2). The whole pipeline is O(1). If you threw a .take(1) in there it'd stop after the first anomaly -- nothing upstream gets processed past that point.

Windowed variant

The above uses the full history, so the mean gets increasingly stable over time. If you care more about recent behavior (normal for the last hour but weird for the last 5 minutes), use a window instead:

const windowSize = 10;

const localAnomalies = iter(latencies)
  .window(windowSize)
  .map(w => {
    const mean = iter(w).mean();
    const std = iter(w).stdDev();
    return { mean, std, last: w[w.length - 1] };
  })
  .enumerate()
  .filter(([_, { mean, std, last }]) => std > 0 && Math.abs(last - mean) > 3 * std)
  .map(([i, { last, mean, std }]) => ({
    index: i + windowSize - 1,
    value: last,
    windowMean: +mean.toFixed(1),
    windowStd: +std.toFixed(1),
  }))
  .toArray();

console.log(localAnomalies);
Enter fullscreen mode Exit fullscreen mode

window(10) slides a 10-element window across the stream. Memory is O(k) where k is the window size, not O(n).

EWMA approach

Exponentially weighted moving average -- weights recent values more heavily. Compare raw values against the smoothed trend:

const alpha = 0.3;
const threshold = 50;

const smoothed = iter(latencies).ewma(alpha).toArray();

const ewmaAnomalies = iter(latencies)
  .enumerate()
  .filter(([i]) => i > 0)
  .filter(([i, val]) => Math.abs(val - smoothed[i - 1]) > threshold)
  .map(([i, val]) => ({ index: i, value: val, ewma: +smoothed[i - 1].toFixed(1) }))
  .toArray();

console.log(ewmaAnomalies);
// [
//   { index: 14, value: 200, ewma: 44.5 },
//   { index: 28, value: 250, ewma: 45.1 }
// ]
Enter fullscreen mode Exit fullscreen mode

One scalar of state. Higher alpha = reacts faster to changes. Lower alpha = smoother line.

With real streams

Same pattern works with generators. Doesn't have to be an array:

function* readLatencies(): Generator<number> {
  // reading from a log file, message queue, API, whatever
  while (true) {
    yield getNextLatency();
  }
}

// infinite stream, stop after 5 anomalies
const first5 = iter(readLatencies())
  .streamingZScore()
  .enumerate()
  .filter(([_, z]) => Math.abs(z) > 3)
  .take(5)
  .toArray();
Enter fullscreen mode Exit fullscreen mode

Every stage is a generator, so this processes one element at a time. Constant memory no matter how long the stream runs. .take(5) propagates back through the chain -- once you have 5 anomalies, upstream generators just stop.

When this isn't worth it

Small fixed dataset that fits in memory? Just do array.filter(x => x > mean + 3 * std). Iterflow's generators add 3-5x overhead per element vs a hand-written loop. The payoff is when you're chaining multiple stages together, dealing with large/infinite streams, or need early termination.


GitHub: github.com/mathscapes/iterflow
npm: npm install @mathscapes/iterflow
Paper: doi.org/10.5281/zenodo.18610143

Top comments (0)