I built my screen-analysis tool for one reason: I wanted something brutally practical that can look at a screen recording, infer what happened as a workflow, and then turn that understanding into automation artifacts (n8n flows, step lists, structured summaries, the whole pipeline).
The expensive part isn’t generating JSON or rendering a report. It’s the multimodal understanding step—every extra frame you send to a model is real money. Screen recordings are the worst-case input distribution for fixed-rate sampling: long stretches of a static UI, then sudden micro-bursts where the user clicks, types two characters, a dropdown opens, a modal flashes, or a tab swaps.
So I stopped treating frames like “data points” and started treating them like a budget.
What went wrong first (the failure that forced the change)
My first cut was the obvious one: sample every Nth frame.
That version failed in two ways, and both were visible in the product.
Failure #1: it wasted frames on dead air. A typical upload in my dashboard is a 6–12 minute recording. In many of those recordings, the user pauses to think, reads a page, or the cursor just sits there. Uniform sampling happily burns frames on those minutes of nothing. The analysis cost scales linearly with video length even when the information content doesn’t.
Failure #2: it missed the “blink-and-you-miss-it” UI moments. The worst incident (the one that made me rewrite the sampler) was a short admin workflow recording where a critical permission dropdown opened and closed quickly. The uniform sampler grabbed frames just before and just after the dropdown—so the model never saw the permission choice. The output summary looked confident and wrong: it inferred a different settings change because it saw the resulting page state but not the transient UI that caused it.
That combination—paying for static time while still missing the actual action—was intolerable. I needed a sampler that spends frames where the video is changing and stops paying rent on static UI.
The key idea: spend frames where the information is
The naive approach is “sample every Nth frame.” It feels fair. It’s also wrong.
A screen recording is not a movie. It’s closer to a ledger: long stable states punctuated by brief transitions. Uniform sampling guarantees two bad outcomes:
- You overpay for stable UI.
- You underfund short bursts where the workflow actually happens.
One analogy (I’ll use it once and move on): sampling uniformly is like paying every team the same bonus regardless of impact. It’s simple accounting, not good allocation.
The fix is to measure change cheaply, group time into segments, and allocate a fixed keyframe budget across those segments.
Runtime architecture: where the sampler sits
Operationally, the system is split into two cooperating pieces:
- A Python analysis service (deployed as a Cloud Run service) that processes the uploaded video, selects keyframes, runs the multimodal analysis, and produces a structured result payload.
- A Next.js app that receives the result via webhook and persists it (and drives the dashboard UI).
The sampler lives inside the analyzer, before the expensive model call.
flowchart TD
subgraph cloudRun[Cloud Run: Python analyzer]
upload[video file] --> reader[frame reader]
reader --> sampler[adaptive keyframe sampler]
sampler --> modelCall[multimodal model call]
modelCall --> payload[analysis payload]
payload --> sender[webhook sender]
end
subgraph nextApp[Next.js: app + API]
receiver[webhook route] --> store[(database/storage)]
receiver --> ui[dashboard updates]
end
sender -->|POST + signature| receiver
The non-obvious point: the sampler isn’t a “nice optimization.” It’s a control surface. It converts “how long is this video?” into “how much analysis do I want to pay for?”
The analyzer sends results back to the Next.js app via a signed webhook—HMAC-SHA256 over the exact JSON bytes, verified with timingSafeEqual on the receiving end. The important lesson there: sign the bytes you actually send (data=body_bytes), not a Python dict that gets re-serialized by the HTTP library. That mismatch produces intermittent verification failures that feel like ghosts. But the webhook seam is a different post—what matters here is what happens before the payload leaves the analyzer.
Adaptive keyframe sampling: score → segment → allocate → pick indices
The sampler is a four-stage pipeline:
- Score change cheaply per frame (or per stride of frames).
- Segment the timeline into “mostly stable” and “high-change” runs.
- Allocate a fixed keyframe budget across segments with guardrails.
- Choose concrete frame indices inside each segment.
This structure is what keeps it practical. The scoring is cheap. The segmentation is linear. The allocation is predictable. The extraction step is mechanical.
Stage 1 — Scoring: cheap visual change
I’m aggressive about keeping scoring cheap. If scoring costs too much, I’m just moving the bill earlier in the pipeline.
The most reliable baseline signal for screen recordings is frame difference energy:
- Convert to grayscale.
- Compute absolute difference between consecutive frames.
- Take the mean of the diff image (optionally normalize).
That catches:
- Cursor movement
- Typing (blinking caret and text updates)
- Dropdowns and modals
- Page transitions
- Hover state changes
It’s not perfect, but it’s fast and correlates well with “something happened.”
Stage 2 — Segment formation: turn a noisy score stream into runs
Per-frame scores are spiky. I don’t want the allocator to chase noise.
So I segment using a simple state machine with hysteresis:
- Maintain a rolling average score.
- Transition into a “hot” segment when the rolling score rises above a threshold.
- Transition back to “cold” when it falls below a lower threshold.
- Enforce a minimum segment length so I don’t create hundreds of micro-segments.
This isn’t academic change-point detection. It’s engineering: stable behavior, low tuning overhead, and predictable output.
Stage 3 — Budget allocation with guardrails
Pure proportional allocation is not enough. It fails on rounding and it can starve short segments.
So my allocator has three rules:
- Every segment gets a floor (
min_frames_per_segment). - No segment can exceed a cap (
max_frames_per_segment). - The remaining budget is distributed proportionally to segment utility.
This makes allocation stable and prevents pathologies.
Stage 4 — Index selection inside segments
Once a segment has been assigned K frames, I pick K indices spread across the segment:
- Always include the segment start (transitions matter).
- Always include the segment end (final state matters).
- Fill the rest with evenly spaced indices.
If I need more fidelity later, I can bias toward local maxima of the score, but evenly spaced selection is a strong baseline and keeps the code straightforward.
Complete runnable implementation (scoring + segmentation + allocation + extraction)
To make this post copyable, here is a single Python script you can run against any MP4. It selects keyframes adaptively and writes them to an output directory.
Dependencies:
opencv-pythonnumpy
Run:
python adaptive_keyframes.py --video input.mp4 --budget 60 --out ./keyframes
Here’s adaptive_keyframes.py:
import argparse
import os
from dataclasses import dataclass
from typing import List, Tuple
import cv2
import numpy as np
@dataclass
class Segment:
start: int # inclusive frame index
end: int # exclusive frame index
score: float
@property
def length(self) -> int:
return max(0, self.end - self.start)
def frame_diff_score(prev_bgr: np.ndarray, curr_bgr: np.ndarray) -> float:
"""Cheap per-frame change score in [0, 1] (roughly).
Uses grayscale mean absolute difference normalized by 255.
"""
prev_gray = cv2.cvtColor(prev_bgr, cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(curr_bgr, cv2.COLOR_BGR2GRAY)
diff = cv2.absdiff(prev_gray, curr_gray)
return float(diff.mean() / 255.0)
def compute_scores(
cap: cv2.VideoCapture,
stride: int = 1,
max_frames: int | None = None,
) -> Tuple[List[float], int]:
"""Return (scores, total_frames_read).
scores[i] is the change score between frame i and i+stride (based on sampled reads).
"""
scores: List[float] = []
ok, prev = cap.read()
if not ok:
return scores, 0
frame_idx = 1
frames_read = 1
while True:
# Skip stride-1 frames between comparisons.
for _ in range(stride - 1):
ok = cap.grab()
if not ok:
return scores, frames_read
frame_idx += 1
frames_read += 1
if max_frames is not None and frames_read >= max_frames:
return scores, frames_read
ok, curr = cap.read()
if not ok:
return scores, frames_read
frames_read += 1
s = frame_diff_score(prev, curr)
scores.append(s)
prev = curr
frame_idx += 1
if max_frames is not None and frames_read >= max_frames:
return scores, frames_read
def segment_scores(
scores: List[float],
window: int = 8,
hot_thresh: float = 0.030,
cold_thresh: float = 0.020,
min_len: int = 12,
) -> List[Segment]:
"""Convert per-step scores into segments with a utility score.
Uses a rolling mean with hysteresis to avoid segment flicker.
"""
if not scores:
return []
# Rolling mean via cumulative sum.
x = np.array(scores, dtype=np.float32)
c = np.cumsum(np.insert(x, 0, 0.0))
def roll_mean(i: int) -> float:
j0 = max(0, i - window + 1)
n = i - j0 + 1
return float((c[i + 1] - c[j0]) / n)
segments: List[Segment] = []
state_hot = False
seg_start = 0
seg_scores: List[float] = []
for i in range(len(scores)):
rm = roll_mean(i)
if state_hot:
seg_scores.append(scores[i])
if rm < cold_thresh:
# Close hot segment at i+1
seg_end = i + 1
if seg_end - seg_start < min_len:
# Too short: merge into previous if possible, else keep.
pass
segments.append(Segment(seg_start, seg_end, float(np.mean(seg_scores) if seg_scores else 0.0)))
# Start cold
state_hot = False
seg_start = seg_end
seg_scores = []
else:
if rm > hot_thresh:
# Close cold segment
seg_end = i + 1
cold_score = float(np.mean(scores[seg_start:seg_end]) if seg_end > seg_start else 0.0)
segments.append(Segment(seg_start, seg_end, cold_score))
# Start hot
state_hot = True
seg_start = seg_end
seg_scores = []
# Close tail
tail_end = len(scores)
if tail_end > seg_start:
tail_score = float(np.mean(scores[seg_start:tail_end]))
segments.append(Segment(seg_start, tail_end, tail_score))
# Merge very short segments to keep output stable.
merged: List[Segment] = []
for seg in segments:
if not merged:
merged.append(seg)
continue
if seg.length < min_len:
prev = merged[-1]
combined = Segment(prev.start, seg.end, (prev.score * prev.length + seg.score * seg.length) / max(1, (prev.length + seg.length)))
merged[-1] = combined
else:
merged.append(seg)
# One more pass: ensure non-empty and strictly increasing.
cleaned: List[Segment] = []
for seg in merged:
if seg.length <= 0:
continue
if cleaned and seg.start < cleaned[-1].end:
seg = Segment(cleaned[-1].end, seg.end, seg.score)
if seg.length > 0:
cleaned.append(seg)
return cleaned
def allocate_frames(
segments: List[Segment],
budget: int,
min_frames_per_segment: int = 1,
max_frames_per_segment: int = 30,
) -> List[int]:
"""Allocate keyframes to segments using floor + proportional + cap."""
if budget <= 0 or not segments:
return []
n = len(segments)
min_total = min_frames_per_segment * n
# If budget is smaller than the floor, distribute 1-by-1.
if min_total >= budget:
alloc = [0] * n
for i in range(budget):
alloc[i % n] += 1
return alloc
utilities = np.array([max(0.0, s.score) for s in segments], dtype=np.float64)
total_u = float(utilities.sum())
alloc = [min_frames_per_segment] * n
remaining = budget - min_total
if total_u == 0.0:
raw = np.full(n, remaining / n, dtype=np.float64)
else:
raw = utilities * (remaining / total_u)
# Add integer parts.
for i in range(n):
add = int(raw[i])
alloc[i] = min(max_frames_per_segment, alloc[i] + add)
allocated = sum(alloc)
# Distribute leftover by fractional parts, respecting caps.
if allocated < budget:
frac = raw - np.floor(raw)
order = np.argsort(-frac) # descending fractional
idx = 0
safety = 0
while allocated < budget and safety < 10_000:
i = int(order[idx % n])
if alloc[i] < max_frames_per_segment:
alloc[i] += 1
allocated += 1
idx += 1
safety += 1
# If we somehow exceeded budget due to caps/floor interplay, trim from lowest utility.
if allocated > budget:
order = np.argsort(utilities) # ascending utility
idx = 0
safety = 0
while allocated > budget and safety < 10_000:
i = int(order[idx % n])
if alloc[i] > 0 and alloc[i] > min_frames_per_segment:
alloc[i] -= 1
allocated -= 1
idx += 1
safety += 1
return alloc
def pick_indices_for_segment(seg: Segment, k: int) -> List[int]:
"""Pick k indices in [seg.start, seg.end] over the score-step domain.
Note: scores are defined between frames; we later map these to actual frames.
"""
if k <= 0 or seg.length <= 0:
return []
if k == 1:
return [seg.start]
# Evenly spaced across [start, end-1]
xs = np.linspace(seg.start, seg.end - 1, num=k)
idxs = sorted({int(round(x)) for x in xs})
# Ensure exactly k by filling gaps if rounding collapsed points.
while len(idxs) < k:
# Insert midpoints between existing points.
candidates = []
for a, b in zip(idxs, idxs[1:]):
if b - a >= 2:
candidates.append((a + b) // 2)
if not candidates:
# Fall back: walk forward.
x = idxs[-1]
if x + 1 < seg.end:
idxs.append(x + 1)
else:
break
else:
for c in candidates:
if c not in idxs and seg.start <= c < seg.end:
idxs.append(c)
if len(idxs) >= k:
break
idxs = sorted(idxs)
# Trim if we overshot.
return idxs[:k]
def select_keyframe_indices(segments: List[Segment], alloc: List[int], stride: int = 1) -> List[int]:
"""Return concrete frame indices (0-based) to extract from the video."""
chosen: List[int] = []
for seg, k in zip(segments, alloc):
step_idxs = pick_indices_for_segment(seg, k)
# Map score-step domain to frame indices.
# score i corresponds to diff between frame i and i+stride;
# picking frame i is a reasonable representative.
for si in step_idxs:
chosen.append(si * stride)
chosen = sorted(set(chosen))
return chosen
def extract_frames(video_path: str, frame_indices: List[int], out_dir: str) -> None:
os.makedirs(out_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video: {video_path}")
frame_set = set(frame_indices)
max_idx = max(frame_set) if frame_set else -1
idx = 0
saved = 0
while idx <= max_idx:
ok, frame = cap.read()
if not ok:
break
if idx in frame_set:
path = os.path.join(out_dir, f"frame_{idx:06d}.jpg")
ok2 = cv2.imwrite(path, frame)
if not ok2:
raise RuntimeError(f"Failed to write: {path}")
saved += 1
idx += 1
cap.release()
if saved == 0 and frame_indices:
raise RuntimeError("No frames were saved; check indices and video decoding")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--video", required=True, help="Path to input video")
ap.add_argument("--out", required=True, help="Output directory for keyframes")
ap.add_argument("--budget", type=int, default=60, help="Total keyframes to extract")
ap.add_argument("--stride", type=int, default=2, help="Compare every Nth frame for scoring")
ap.add_argument("--window", type=int, default=8, help="Rolling window for segmentation")
ap.add_argument("--hot", type=float, default=0.030, help="Enter hot segment threshold")
ap.add_argument("--cold", type=float, default=0.020, help="Exit hot segment threshold")
args = ap.parse_args()
cap = cv2.VideoCapture(args.video)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video: {args.video}")
scores, frames_read = compute_scores(cap, stride=args.stride)
cap.release()
segments = segment_scores(scores, window=args.window, hot_thresh=args.hot, cold_thresh=args.cold)
alloc = allocate_frames(segments, budget=args.budget, min_frames_per_segment=1, max_frames_per_segment=max(2, args.budget))
keyframes = select_keyframe_indices(segments, alloc, stride=args.stride)
# Keep within a hard limit (rounding/uniqueness can change count).
if len(keyframes) > args.budget:
keyframes = keyframes[: args.budget]
extract_frames(args.video, keyframes, args.out)
print(f"frames_read={frames_read}")
print(f"scores={len(scores)} segments={len(segments)}")
print(f"budget={args.budget} selected={len(keyframes)}")
if segments:
hot_share = sum(1 for s in segments if s.score > args.hot) / len(segments)
print(f"segment_hot_share={hot_share:.2f}")
if __name__ == "__main__":
main()
This script is intentionally direct:
- It produces a stable segment list.
- It guarantees you never exceed your frame budget.
- It writes deterministic frame files for downstream analysis.
In my production service, the extracted frames feed into the multimodal call (Gemini via google-generativeai in my dependencies), and the resulting analysis payload is sent back to the Next.js app via signed webhook.
Practical tuning notes (the stuff that matters after the first demo)
Once you have the shape working, the wins come from tuning behavior under ugly real-world recordings.
1) Choose a stride that matches your content
If you score every frame on a 30 FPS recording, you’ll find tiny cursor movements everywhere. That’s not always bad, but it can inflate your “change” signal.
A stride of 2–5 is a good starting point for screen recordings. You’re still sensitive to UI changes, but you suppress sub-frame noise.
2) Hysteresis prevents segment flicker
Using hot_thresh and cold_thresh (two thresholds) matters. With a single threshold, you’ll bounce between hot/cold constantly around the cutoff.
Hysteresis gives you stable segments and makes budgets behave predictably.
3) Floors and caps are not optional
Without a floor, short segments can round down to zero and you’ll miss the exact micro-burst you cared about.
Without a cap, one long “busy” segment can consume your entire budget and you’ll lose context from the rest of the workflow.
4) Always sign the bytes you send
If you sign json.dumps(payload) but send json=payload, the HTTP library may serialize with different key ordering/spacing than what you signed. That produces intermittent verification failures that feel like ghosts.
Signing the actual bytes (data=body_bytes) eliminates that entire class of bugs. This matters here because the analysis payload—the one carrying your carefully selected keyframes—is the most expensive artifact in the pipeline. If it gets dropped by a signing mismatch, you’ve wasted the entire frame budget.
Why this design scales in production
The sampler works because it respects two hard constraints:
- Cost scales with selected frames, not video length. Once the budget is fixed, the expensive multimodal step has an upper bound.
- Runtime stays predictable. Scoring is a linear pass; segmentation is a linear pass; allocation is linear with tiny constant factors.
Most importantly: the sampler makes my system behave like something I can operate.
Instead of arguing about whether a 12-minute video is “too long,” I decide how many frames I’m willing to buy. The analyzer spends that budget on the parts of the recording that actually change, and it stops paying for static UI.
That’s the whole point: disciplined allocation beats fair sampling.
Top comments (0)