DEV Community

Daniel Romitelli
Daniel Romitelli

Posted on • Originally published at craftedbydaniel.com

Adaptive Keyframe Sampling: How I Spend a Frame Budget Like It’s Cash

I built my screen-analysis tool for one reason: I wanted something brutally practical that can look at a screen recording, infer what happened as a workflow, and then turn that understanding into automation artifacts (n8n flows, step lists, structured summaries, the whole pipeline).

The expensive part isn’t generating JSON or rendering a report. It’s the multimodal understanding step—every extra frame you send to a model is real money. Screen recordings are the worst-case input distribution for fixed-rate sampling: long stretches of a static UI, then sudden micro-bursts where the user clicks, types two characters, a dropdown opens, a modal flashes, or a tab swaps.

So I stopped treating frames like “data points” and started treating them like a budget.

What went wrong first (the failure that forced the change)

My first cut was the obvious one: sample every Nth frame.

That version failed in two ways, and both were visible in the product.

Failure #1: it wasted frames on dead air. A typical upload in my dashboard is a 6–12 minute recording. In many of those recordings, the user pauses to think, reads a page, or the cursor just sits there. Uniform sampling happily burns frames on those minutes of nothing. The analysis cost scales linearly with video length even when the information content doesn’t.

Failure #2: it missed the “blink-and-you-miss-it” UI moments. The worst incident (the one that made me rewrite the sampler) was a short admin workflow recording where a critical permission dropdown opened and closed quickly. The uniform sampler grabbed frames just before and just after the dropdown—so the model never saw the permission choice. The output summary looked confident and wrong: it inferred a different settings change because it saw the resulting page state but not the transient UI that caused it.

That combination—paying for static time while still missing the actual action—was intolerable. I needed a sampler that spends frames where the video is changing and stops paying rent on static UI.

The key idea: spend frames where the information is

The naive approach is “sample every Nth frame.” It feels fair. It’s also wrong.

A screen recording is not a movie. It’s closer to a ledger: long stable states punctuated by brief transitions. Uniform sampling guarantees two bad outcomes:

  • You overpay for stable UI.
  • You underfund short bursts where the workflow actually happens.

One analogy (I’ll use it once and move on): sampling uniformly is like paying every team the same bonus regardless of impact. It’s simple accounting, not good allocation.

The fix is to measure change cheaply, group time into segments, and allocate a fixed keyframe budget across those segments.

Runtime architecture: where the sampler sits

Operationally, the system is split into two cooperating pieces:

  • A Python analysis service (deployed as a Cloud Run service) that processes the uploaded video, selects keyframes, runs the multimodal analysis, and produces a structured result payload.
  • A Next.js app that receives the result via webhook and persists it (and drives the dashboard UI).

The sampler lives inside the analyzer, before the expensive model call.

flowchart TD
  subgraph cloudRun[Cloud Run: Python analyzer]
    upload[video file] --> reader[frame reader]
    reader --> sampler[adaptive keyframe sampler]
    sampler --> modelCall[multimodal model call]
    modelCall --> payload[analysis payload]
    payload --> sender[webhook sender]
  end

  subgraph nextApp[Next.js: app + API]
    receiver[webhook route] --> store[(database/storage)]
    receiver --> ui[dashboard updates]
  end

  sender -->|POST + signature| receiver
Enter fullscreen mode Exit fullscreen mode

The non-obvious point: the sampler isn’t a “nice optimization.” It’s a control surface. It converts “how long is this video?” into “how much analysis do I want to pay for?”

The analyzer sends results back to the Next.js app via a signed webhook—HMAC-SHA256 over the exact JSON bytes, verified with timingSafeEqual on the receiving end. The important lesson there: sign the bytes you actually send (data=body_bytes), not a Python dict that gets re-serialized by the HTTP library. That mismatch produces intermittent verification failures that feel like ghosts. But the webhook seam is a different post—what matters here is what happens before the payload leaves the analyzer.

Adaptive keyframe sampling: score → segment → allocate → pick indices

The sampler is a four-stage pipeline:

  1. Score change cheaply per frame (or per stride of frames).
  2. Segment the timeline into “mostly stable” and “high-change” runs.
  3. Allocate a fixed keyframe budget across segments with guardrails.
  4. Choose concrete frame indices inside each segment.

This structure is what keeps it practical. The scoring is cheap. The segmentation is linear. The allocation is predictable. The extraction step is mechanical.

Stage 1 — Scoring: cheap visual change

I’m aggressive about keeping scoring cheap. If scoring costs too much, I’m just moving the bill earlier in the pipeline.

The most reliable baseline signal for screen recordings is frame difference energy:

  • Convert to grayscale.
  • Compute absolute difference between consecutive frames.
  • Take the mean of the diff image (optionally normalize).

That catches:

  • Cursor movement
  • Typing (blinking caret and text updates)
  • Dropdowns and modals
  • Page transitions
  • Hover state changes

It’s not perfect, but it’s fast and correlates well with “something happened.”

Stage 2 — Segment formation: turn a noisy score stream into runs

Per-frame scores are spiky. I don’t want the allocator to chase noise.

So I segment using a simple state machine with hysteresis:

  • Maintain a rolling average score.
  • Transition into a “hot” segment when the rolling score rises above a threshold.
  • Transition back to “cold” when it falls below a lower threshold.
  • Enforce a minimum segment length so I don’t create hundreds of micro-segments.

This isn’t academic change-point detection. It’s engineering: stable behavior, low tuning overhead, and predictable output.

Stage 3 — Budget allocation with guardrails

Pure proportional allocation is not enough. It fails on rounding and it can starve short segments.

So my allocator has three rules:

  • Every segment gets a floor (min_frames_per_segment).
  • No segment can exceed a cap (max_frames_per_segment).
  • The remaining budget is distributed proportionally to segment utility.

This makes allocation stable and prevents pathologies.

Stage 4 — Index selection inside segments

Once a segment has been assigned K frames, I pick K indices spread across the segment:

  • Always include the segment start (transitions matter).
  • Always include the segment end (final state matters).
  • Fill the rest with evenly spaced indices.

If I need more fidelity later, I can bias toward local maxima of the score, but evenly spaced selection is a strong baseline and keeps the code straightforward.

Complete runnable implementation (scoring + segmentation + allocation + extraction)

To make this post copyable, here is a single Python script you can run against any MP4. It selects keyframes adaptively and writes them to an output directory.

Dependencies:

  • opencv-python
  • numpy

Run:

python adaptive_keyframes.py --video input.mp4 --budget 60 --out ./keyframes
Enter fullscreen mode Exit fullscreen mode

Here’s adaptive_keyframes.py:

import argparse
import os
from dataclasses import dataclass
from typing import List, Tuple

import cv2
import numpy as np


@dataclass
class Segment:
    start: int  # inclusive frame index
    end: int    # exclusive frame index
    score: float

    @property
    def length(self) -> int:
        return max(0, self.end - self.start)


def frame_diff_score(prev_bgr: np.ndarray, curr_bgr: np.ndarray) -> float:
    """Cheap per-frame change score in [0, 1] (roughly).

    Uses grayscale mean absolute difference normalized by 255.
    """
    prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY)
    curr_gray = cv2.cvtColor(curr_bgr, cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(prev_gray, curr_gray)
    return float(diff.mean() / 255.0)


def compute_scores(
    cap: cv2.VideoCapture,
    stride: int = 1,
    max_frames: int | None = None,
) -> Tuple[List[float], int]:
    """Return (scores, total_frames_read).

    scores[i] is the change score between frame i and i+stride (based on sampled reads).
    """
    scores: List[float] = []

    ok, prev = cap.read()
    if not ok:
        return scores, 0

    frame_idx = 1
    frames_read = 1

    while True:
        # Skip stride-1 frames between comparisons.
        for _ in range(stride - 1):
            ok = cap.grab()
            if not ok:
                return scores, frames_read
            frame_idx += 1
            frames_read += 1
            if max_frames is not None and frames_read >= max_frames:
                return scores, frames_read

        ok, curr = cap.read()
        if not ok:
            return scores, frames_read
        frames_read += 1

        s = frame_diff_score(prev, curr)
        scores.append(s)

        prev = curr
        frame_idx += 1

        if max_frames is not None and frames_read >= max_frames:
            return scores, frames_read


def segment_scores(
    scores: List[float],
    window: int = 8,
    hot_thresh: float = 0.030,
    cold_thresh: float = 0.020,
    min_len: int = 12,
) -> List[Segment]:
    """Convert per-step scores into segments with a utility score.

    Uses a rolling mean with hysteresis to avoid segment flicker.
    """
    if not scores:
        return []

    # Rolling mean via cumulative sum.
    x = np.array(scores, dtype=np.float32)
    c = np.cumsum(np.insert(x, 0, 0.0))

    def roll_mean(i: int) -> float:
        j0 = max(0, i - window + 1)
        n = i - j0 + 1
        return float((c[i + 1] - c[j0]) / n)

    segments: List[Segment] = []
    state_hot = False

    seg_start = 0
    seg_scores: List[float] = []

    for i in range(len(scores)):
        rm = roll_mean(i)

        if state_hot:
            seg_scores.append(scores[i])
            if rm < cold_thresh:
                # Close hot segment at i+1
                seg_end = i + 1
                if seg_end - seg_start < min_len:
                    # Too short: merge into previous if possible, else keep.
                    pass
                segments.append(Segment(seg_start, seg_end, float(np.mean(seg_scores) if seg_scores else 0.0)))

                # Start cold
                state_hot = False
                seg_start = seg_end
                seg_scores = []
        else:
            if rm > hot_thresh:
                # Close cold segment
                seg_end = i + 1
                cold_score = float(np.mean(scores[seg_start:seg_end]) if seg_end > seg_start else 0.0)
                segments.append(Segment(seg_start, seg_end, cold_score))

                # Start hot
                state_hot = True
                seg_start = seg_end
                seg_scores = []

    # Close tail
    tail_end = len(scores)
    if tail_end > seg_start:
        tail_score = float(np.mean(scores[seg_start:tail_end]))
        segments.append(Segment(seg_start, tail_end, tail_score))

    # Merge very short segments to keep output stable.
    merged: List[Segment] = []
    for seg in segments:
        if not merged:
            merged.append(seg)
            continue
        if seg.length < min_len:
            prev = merged[-1]
            combined = Segment(prev.start, seg.end, (prev.score * prev.length + seg.score * seg.length) / max(1, (prev.length + seg.length)))
            merged[-1] = combined
        else:
            merged.append(seg)

    # One more pass: ensure non-empty and strictly increasing.
    cleaned: List[Segment] = []
    for seg in merged:
        if seg.length <= 0:
            continue
        if cleaned and seg.start < cleaned[-1].end:
            seg = Segment(cleaned[-1].end, seg.end, seg.score)
        if seg.length > 0:
            cleaned.append(seg)

    return cleaned


def allocate_frames(
    segments: List[Segment],
    budget: int,
    min_frames_per_segment: int = 1,
    max_frames_per_segment: int = 30,
) -> List[int]:
    """Allocate keyframes to segments using floor + proportional + cap."""
    if budget <= 0 or not segments:
        return []

    n = len(segments)
    min_total = min_frames_per_segment * n

    # If budget is smaller than the floor, distribute 1-by-1.
    if min_total >= budget:
        alloc = [0] * n
        for i in range(budget):
            alloc[i % n] += 1
        return alloc

    utilities = np.array([max(0.0, s.score) for s in segments], dtype=np.float64)
    total_u = float(utilities.sum())

    alloc = [min_frames_per_segment] * n
    remaining = budget - min_total

    if total_u == 0.0:
        raw = np.full(n, remaining / n, dtype=np.float64)
    else:
        raw = utilities * (remaining / total_u)

    # Add integer parts.
    for i in range(n):
        add = int(raw[i])
        alloc[i] = min(max_frames_per_segment, alloc[i] + add)

    allocated = sum(alloc)

    # Distribute leftover by fractional parts, respecting caps.
    if allocated < budget:
        frac = raw - np.floor(raw)
        order = np.argsort(-frac)  # descending fractional
        idx = 0
        safety = 0
        while allocated < budget and safety < 10_000:
            i = int(order[idx % n])
            if alloc[i] < max_frames_per_segment:
                alloc[i] += 1
                allocated += 1
            idx += 1
            safety += 1

    # If we somehow exceeded budget due to caps/floor interplay, trim from lowest utility.
    if allocated > budget:
        order = np.argsort(utilities)  # ascending utility
        idx = 0
        safety = 0
        while allocated > budget and safety < 10_000:
            i = int(order[idx % n])
            if alloc[i] > 0 and alloc[i] > min_frames_per_segment:
                alloc[i] -= 1
                allocated -= 1
            idx += 1
            safety += 1

    return alloc


def pick_indices_for_segment(seg: Segment, k: int) -> List[int]:
    """Pick k indices in [seg.start, seg.end] over the score-step domain.

    Note: scores are defined between frames; we later map these to actual frames.
    """
    if k <= 0 or seg.length <= 0:
        return []

    if k == 1:
        return [seg.start]

    # Evenly spaced across [start, end-1]
    xs = np.linspace(seg.start, seg.end - 1, num=k)
    idxs = sorted({int(round(x)) for x in xs})

    # Ensure exactly k by filling gaps if rounding collapsed points.
    while len(idxs) < k:
        # Insert midpoints between existing points.
        candidates = []
        for a, b in zip(idxs, idxs[1:]):
            if b - a >= 2:
                candidates.append((a + b) // 2)
        if not candidates:
            # Fall back: walk forward.
            x = idxs[-1]
            if x + 1 < seg.end:
                idxs.append(x + 1)
            else:
                break
        else:
            for c in candidates:
                if c not in idxs and seg.start <= c < seg.end:
                    idxs.append(c)
                    if len(idxs) >= k:
                        break
        idxs = sorted(idxs)

    # Trim if we overshot.
    return idxs[:k]


def select_keyframe_indices(segments: List[Segment], alloc: List[int], stride: int = 1) -> List[int]:
    """Return concrete frame indices (0-based) to extract from the video."""
    chosen: List[int] = []

    for seg, k in zip(segments, alloc):
        step_idxs = pick_indices_for_segment(seg, k)
        # Map score-step domain to frame indices.
        # score i corresponds to diff between frame i and i+stride;
        # picking frame i is a reasonable representative.
        for si in step_idxs:
            chosen.append(si * stride)

    chosen = sorted(set(chosen))
    return chosen


def extract_frames(video_path: str, frame_indices: List[int], out_dir: str) -> None:
    os.makedirs(out_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    frame_set = set(frame_indices)
    max_idx = max(frame_set) if frame_set else -1

    idx = 0
    saved = 0
    while idx <= max_idx:
        ok, frame = cap.read()
        if not ok:
            break

        if idx in frame_set:
            path = os.path.join(out_dir, f"frame_{idx:06d}.jpg")
            ok2 = cv2.imwrite(path, frame)
            if not ok2:
                raise RuntimeError(f"Failed to write: {path}")
            saved += 1

        idx += 1

    cap.release()

    if saved == 0 and frame_indices:
        raise RuntimeError("No frames were saved; check indices and video decoding")


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument("--video", required=True, help="Path to input video")
    ap.add_argument("--out", required=True, help="Output directory for keyframes")
    ap.add_argument("--budget", type=int, default=60, help="Total keyframes to extract")
    ap.add_argument("--stride", type=int, default=2, help="Compare every Nth frame for scoring")
    ap.add_argument("--window", type=int, default=8, help="Rolling window for segmentation")
    ap.add_argument("--hot", type=float, default=0.030, help="Enter hot segment threshold")
    ap.add_argument("--cold", type=float, default=0.020, help="Exit hot segment threshold")
    args = ap.parse_args()

    cap = cv2.VideoCapture(args.video)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {args.video}")

    scores, frames_read = compute_scores(cap, stride=args.stride)
    cap.release()

    segments = segment_scores(scores, window=args.window, hot_thresh=args.hot, cold_thresh=args.cold)

    alloc = allocate_frames(segments, budget=args.budget, min_frames_per_segment=1, max_frames_per_segment=max(2, args.budget))
    keyframes = select_keyframe_indices(segments, alloc, stride=args.stride)

    # Keep within a hard limit (rounding/uniqueness can change count).
    if len(keyframes) > args.budget:
        keyframes = keyframes[: args.budget]

    extract_frames(args.video, keyframes, args.out)

    print(f"frames_read={frames_read}")
    print(f"scores={len(scores)} segments={len(segments)}")
    print(f"budget={args.budget} selected={len(keyframes)}")
    if segments:
        hot_share = sum(1 for s in segments if s.score > args.hot) / len(segments)
        print(f"segment_hot_share={hot_share:.2f}")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

This script is intentionally direct:

  • It produces a stable segment list.
  • It guarantees you never exceed your frame budget.
  • It writes deterministic frame files for downstream analysis.

In my production service, the extracted frames feed into the multimodal call (Gemini via google-generativeai in my dependencies), and the resulting analysis payload is sent back to the Next.js app via signed webhook.

Practical tuning notes (the stuff that matters after the first demo)

Once you have the shape working, the wins come from tuning behavior under ugly real-world recordings.

1) Choose a stride that matches your content

If you score every frame on a 30 FPS recording, you’ll find tiny cursor movements everywhere. That’s not always bad, but it can inflate your “change” signal.

A stride of 2–5 is a good starting point for screen recordings. You’re still sensitive to UI changes, but you suppress sub-frame noise.

2) Hysteresis prevents segment flicker

Using hot_thresh and cold_thresh (two thresholds) matters. With a single threshold, you’ll bounce between hot/cold constantly around the cutoff.

Hysteresis gives you stable segments and makes budgets behave predictably.

3) Floors and caps are not optional

Without a floor, short segments can round down to zero and you’ll miss the exact micro-burst you cared about.

Without a cap, one long “busy” segment can consume your entire budget and you’ll lose context from the rest of the workflow.

4) Always sign the bytes you send

If you sign json.dumps(payload) but send json=payload, the HTTP library may serialize with different key ordering/spacing than what you signed. That produces intermittent verification failures that feel like ghosts.

Signing the actual bytes (data=body_bytes) eliminates that entire class of bugs. This matters here because the analysis payload—the one carrying your carefully selected keyframes—is the most expensive artifact in the pipeline. If it gets dropped by a signing mismatch, you’ve wasted the entire frame budget.

Why this design scales in production

The sampler works because it respects two hard constraints:

  • Cost scales with selected frames, not video length. Once the budget is fixed, the expensive multimodal step has an upper bound.
  • Runtime stays predictable. Scoring is a linear pass; segmentation is a linear pass; allocation is linear with tiny constant factors.

Most importantly: the sampler makes my system behave like something I can operate.

Instead of arguing about whether a 12-minute video is “too long,” I decide how many frames I’m willing to buy. The analyzer spends that budget on the parts of the recording that actually change, and it stops paying for static UI.

That’s the whole point: disciplined allocation beats fair sampling.

Top comments (0)