Most AI tools on the market today are fundamentally text-based. Even when they "listen" to audio, they act on static transcripts after the fact. But human communication doesn't happen after the fact. It happens in the moment—in our tone of voice, our pacing, our posture, and our eye contact.
“Vision agents don’t just see pixels — they interpret intent, context, and meaning in motion.”
I wanted to build an AI that could actually coach people on how they communicate, rather than just summarizing what they said. I wanted an AI that could see if you were slouching, hear if you were speaking too fast, and interrupt you politely with actionable advice.
The result is Visions, an open-source real-time AI Communication Coach.
"Words are only part of the message. Your posture, your pace, your pauses — they say everything the words don't."
In this post, I want to pull back the curtain on the architecture behind Visions. We'll look at how we orchestrated Gemini Realtime API, GetStream, Deepgram STT, ElevenLabs TTS, and Ultralytics YOLO into a seamless, unified real-time loop.
The Architecture Overview
Building a multimodal agent that operates on a live video call requires a robust, low-latency event loop. The system needs to juggle four concurrent responsibilities:
- The Edge (WebRTC Video/Audio streaming)
- The Ears (Speech-to-Text inference)
- The Eyes (Computer Vision pose estimation)
- The Brain (The LLM reasoning engine)
- The Voice (Text-to-Speech response generation)
To manage this, the core orchestrator (located in performance_agent.py) relies on an async event loop that subscribes to an incoming WebRTC media track.
1. The Edge: GetStream WebRTC
We use GetStream as our WebRTC interface. When the Python agent spins up, it generates a unique call_id and opens a staging URL in the user's browser. Once the user clicks "Join", GetStream establishes the peer-to-peer connection and begins piping real-time audio and video frames directly into our Python backend.
2. The Ears & Voice: Deepgram + ElevenLabs
Low latency is non-negotiable for a natural conversation. For the "ears," we use Deepgram STT configured for eager turn detection. As the user speaks, Deepgram streams STTPartialTranscriptEvent objects into our SpeechProcessor.
The SpeechProcessor doesn't just store text; it actively tracks:
- WPM (Words Per Minute)
- Filler Word Frequency ("um", "uh", "like")
- Hesitation Index (derived from average pause durations)
For the "Voice," we use ElevenLabs TTS (specifically the Multilingual V2 model) because it offers some of the most expressive, human-like cadence available.
3. The Eyes: Real-Time Pose Estimation with YOLO
This is where Visions goes beyond a standard voice-bot. As GetStream pipes video frames into the event loop, we route them through our CommunicationVisionProcessor.
We use Ultralytics YOLO (yolo11n-pose.pt specifically) for lightweight pose estimation. By running at a deliberate 5 FPS to conserve CPU, YOLO identifies key landmarks on the speaker's body.
From these landmarks, the Vision Processor calculates:
- Posture ("upright", "slouched", "leaning")
- Eye Contact (tracking face orientation relative to the camera center)
- Hand Activity (measuring how wildly the wrists are moving relative to the torso)
Suddenly, the AI doesn't just know what you're saying; it knows how you look while saying it.
4. The Brain: The 30-Second Coaching Loop
Wiring Deepgram, YOLO, and ElevenLabs into Gemini is powerful, but it can be overwhelming for the user if the AI is constantly interrupting.
To solve this, we implemented a custom SessionManager that runs a strict 30-second evaluation cadence.
Every 30 seconds, the manager pauses and queries the pipelines. It grabs:
- The
speech_state(WPM, Hesitation Index) - The
vision_state(Posture, Eye Contact, Engagement) - The
language_scores(Clarity, Structure)
It bundles these metrics into a structured "coaching context" prompt and fires it to the Gemini Realtime API.
Gemini evaluates the context against its system instructions (which dictate the current mode, such as Interview, Public Speaking, or Debate). By instructing Gemini to be "positive-first and actionable," it generates a maximum 3-sentence spoken response.
For example, Gemini might see that the user's WPM is perfect, but their posture is poor. It will interrupt politely: "Great pacing on that last point. But I notice you're slouching—sit up straight to project more confidence before your next sentence."
To visualize how all these separated folders actually move data in real-time within the orchestrator, here is a diagram of the call flow:
Graceful Degradation and Reporting
Because live calls are chaotic (tabs get closed, internet connections drop), the performance_agent.py script wraps the session in a massive try/except block. Even if the session ends abruptly, the agent gathers all the 30-second snapshots it collected and generates a comprehensive Markdown and JSON Session Performance Report.
How the Code Actually Flows
To understand the value of this architecture in real-time, let's walk through exactly what happens when you start speaking.
If we look inside performance_agent.py, the core orchestration doesn’t rely on a single, monolithic LLM text generation. It relies on a multi-stage event loop.
Here is the exact timeline of a single 30-second window in the life of the AI Coach:
Step 1: Ingesting the Audio & Video Frames (Seconds 0–29)
As you talk into your camera, GetStream routes your audio to Deepgram STT and your video frames to YOLO.
Our agent listens for words. Every time Deepgram processes a word, it triggers an event that we've subscribed to. We pipe that transcript directly into our custom SpeechProcessor:
# ── Wire Deepgram transcripts → SpeechProcessor ─
@agent.events.subscribe
async def on_transcript(event: STTTranscriptEvent):
speech_proc.on_transcript(event.text or "", is_final=True)
@agent.events.subscribe
async def on_partial_transcript(event: STTPartialTranscriptEvent):
speech_proc.on_transcript(event.text or "", is_final=False)
Why we wrote this: By handling partial transcripts (is_final=False), the system doesn't have to wait for you to finish your sentence to start calculating your Words Per Minute or counting your "ums" and "uhs".
Step 2: The Coaching Trigger (Second 30)
Exactly every 30 seconds, the SessionManager interrupts the background collection and fires the run_coaching_cycle function.
This is where the magic happens. The orchestrator asks all the processors to hand over their current state simultaneously:
async def run_coaching_cycle(agent, speech_proc, lang_intel, vision_proc, session):
# 1. Grab raw statistics
speech_state = speech_proc.get_state() # Gets WPM, Filler Count, Hesitation
vision_state = vision_proc.get_state() # Gets Posture, Eye Contact
# 2. Build the LLM Prompt
lang_scores = lang_intel.get_last_scores()
coaching_context = lang_intel.build_coaching_context(lang_scores, speech_state)
Why we wrote this: If we just fed the raw transcript to the LLM and asked "how did they do?", the LLM would hallucinate or give vague feedback. Instead, we manually give the LLM hard data. The coaching_context string basically tells Gemini: "The user spoke at 110 WPM, used 3 filler words, and their posture is slouched. GIve them a 3-sentence coaching tip."
Step 3: Delivering the Voice (Second 31)
Once the coaching_context prompt is built, we hand it off to the Gemini Realtime API, which generates the text response.
But we don't just print text to a screen. We use the <agent.simple_response()> method provided by the vision_agents framework to automatically route Gemini's output through ElevenLabs TTS:
# Send coaching trigger to Gemini Realtime (it will speak via ElevenLabs TTS)
await agent.simple_response(coaching_context)
# Store snapshot after coaching
session.record_snapshot(
speech_state=speech_state,
language_scores=lang_scores,
vision_state=vision_state,
coaching_message=coaching_context[:200], # Save to Session Report
)
# Rotate the window to start tracking the NEXT 30 seconds
speech_proc.reset_window()
Why we wrote this: Calling agent.simple_response() seamlessly generates the highly-emotive ElevenLabs voice (which we defined earlier in the file to use the voice ID "VR6AewLTigWG4xSOukaG") and streams the AI's audio back into the WebRTC call.
Finally, we clear the speech_proc window so the AI doesn't evaluate the same "ums" and "uhs" during the next 30-second loop.
The Value of This Flow
By breaking the architecture into discrete processors (Speech, Vision, Language), we ensure the LLM is acting as a true "Coach"—giving feedback based on quantitative data, rather than just acting as a conversational chatbot.
The Project Structure: Why Architecture Dictates Latency
When building an AI agent that listens, watches, evaluates, and speaks all at the same time, you cannot shove everything into one massive app.py script. The latency will skyrocket, and the AI will constantly interrupt itself.
Decoupling is the secret to a fast multimodal coach.
Here is the folder structure we designed for Visions, and why each piece is strictly isolated:
c:\Visions
├── performance_agent.py # The Main Event Loop & Orchestrator
├── performance_agent.md # The Gemini Instruct Prompt & Coaching Rules
├── pipelines/ # The Real-Time Analytical Engines
│ ├── speech_processor.py # Tracks WPM, Pauses, and Fillers
│ ├── vision_processor.py # Runs YOLO for Body Language
│ └── language_intelligence.py# Evaluates STAR structure & Grammar
├── session/ # Data Persistence & Reporting
│ ├── session_manager.py # The 30-Second Metronome
│ └── performance_report.py # Generates the final Markdown Summaries
└── reports/ # Where local `<session_id>.md` reports save
How the Folders Interact During an Active Call
When you run python performance_agent.py, you start an asynchronous event loop that relies directly on the separation of these folders.
Here is why this structure is so important during the running state:
- The Orchestrator (
performance_agent.py)
This file is intentionally "dumb." It does not calculate words per minute. It does not look at pixels. It acts purely like an air-traffic controller, establishing the GetStream WebRTC WebSockets to EleventhLabs, Deepgram, and Gemini.
Because we offload the heavy data tracking to the /pipelines folder, the main orchestrator's thread is completely free to handle streaming the AI's spoken audio back to the user without stuttering.
- The Analytical Engines (
/pipelines)
The files inside /pipelines are isolated state machines.
-
vision_processor.py: This script subscribes independently to GetStream's video track. It swallows frames without halting the audio loops, calculating the bounding boxes and angles of your shoulders and eyes using YOLO.
"""
Vision Processing Pipeline
Extends YOLOPoseProcessor to compute higher-level body language metrics:
- Face orientation, eye contact, posture, hand movement
- Generates confidence index, engagement score, body language rating
"""
import logging
import math
import time
from typing import Any, Optional
from vision_agents.plugins.ultralytics import YOLOPoseProcessor
logger = logging.getLogger(__name__)
# COCO keypoint indices
KP_NOSE = 0
KP_LEFT_EYE = 1
KP_RIGHT_EYE = 2
KP_LEFT_EAR = 3
KP_RIGHT_EAR = 4
KP_LEFT_SHOULDER = 5
KP_RIGHT_SHOULDER = 6
KP_LEFT_HIP = 11
KP_RIGHT_HIP = 12
KP_LEFT_WRIST = 9
KP_RIGHT_WRIST = 10
KP_LEFT_ELBOW = 7
KP_RIGHT_ELBOW = 8
class CommunicationVisionProcessor(YOLOPoseProcessor):
"""
Extended YOLO Pose Processor for communication coaching.
Adds body language interpretation on top of raw pose detection:
- Eye contact estimation (camera-facing heuristic from nose/ear alignment)
- Posture scoring (shoulder height symmetry and vertical alignment)
- Hand movement tracking (wrist velocity)
- Confidence and engagement composite scores
get_state() returns a structured dict injected into the LLM each turn.
"""
name = "communication_vision"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# Wrist velocity tracking
self._prev_wrist_positions: dict[str, tuple[float, float]] = {}
self._prev_wrist_time: float = time.time()
self._wrist_velocity_history: list[float] = [] # last N frames
# Rolling score history (for smoothing)
self._eye_contact_history: list[float] = []
self._posture_history: list[float] = []
self._confidence_history: list[float] = []
self._engagement_history: list[float] = []
self._history_len = 30 # ~1 second at 30fps
# Latest state for get_state()
self._latest_state: dict = {
"confidence_index": 70.0,
"engagement_score": 70.0,
"body_language_rating": "good",
"posture": "upright",
"eye_contact": "strong",
"hand_activity": "natural",
"persons_detected": 0,
}
def _safe_keypoint(
self, kpts: list, index: int, conf_threshold: float = 0.3
) -> Optional[tuple[float, float]]:
"""Return (x, y) if keypoint confidence exceeds threshold, else None."""
if index >= len(kpts):
return None
x, y, c = kpts[index]
return (float(x), float(y)) if float(c) > conf_threshold else None
def _score_eye_contact(self, kpts: list) -> float:
"""
Estimate eye contact (0-100) using nose/ear symmetry.
When looking at camera: nose is centered, ears are equidistant.
When looking away: nose shifts toward one side, ear asymmetry increases.
"""
nose = self._safe_keypoint(kpts, KP_NOSE)
left_ear = self._safe_keypoint(kpts, KP_LEFT_EAR)
right_ear = self._safe_keypoint(kpts, KP_RIGHT_EAR)
if not nose or not left_ear or not right_ear:
return 60.0 # neutral when keypoints unavailable
nose_x = nose[0]
left_ear_x = left_ear[0]
right_ear_x = right_ear[0]
ear_width = abs(right_ear_x - left_ear_x)
if ear_width < 1e-3:
return 60.0
# Ratio of nose position within ear span (0.5 = perfectly centered)
nose_ratio = (nose_x - left_ear_x) / ear_width
center_deviation = abs(nose_ratio - 0.5) # 0 = perfect center
# Convert to 0-100 score (0 deviation = 100, 0.5 deviation = 0)
score = max(0.0, 100.0 - (center_deviation / 0.5) * 100.0)
return round(score, 1)
def _score_posture(self, kpts: list) -> tuple[float, str]:
"""
Score posture (0-100) and classify it.
Uses shoulder height symmetry and shoulder-to-hip vertical alignment.
Returns (score, label).
"""
left_shoulder = self._safe_keypoint(kpts, KP_LEFT_SHOULDER)
right_shoulder = self._safe_keypoint(kpts, KP_RIGHT_SHOULDER)
left_hip = self._safe_keypoint(kpts, KP_LEFT_HIP)
right_hip = self._safe_keypoint(kpts, KP_RIGHT_HIP)
if not left_shoulder or not right_shoulder:
return 65.0, "upright"
# Shoulder symmetry (y coordinates should be similar)
shoulder_height_diff = abs(left_shoulder[1] - right_shoulder[1])
shoulder_width = abs(left_shoulder[0] - right_shoulder[0])
if shoulder_width < 1e-3:
return 65.0, "upright"
symmetry_score = max(0.0, 100.0 - (shoulder_height_diff / shoulder_width) * 200)
# Shoulder vertical position (higher on screen = better posture for seated person)
# We use shoulder midpoint y as proxy for upright-ness
shoulder_mid_y = (left_shoulder[1] + right_shoulder[1]) / 2.0
posture_score = symmetry_score # Simplified; extend with hip alignment if available
if left_hip and right_hip:
hip_mid_y = (left_hip[1] + right_hip[1]) / 2.0
# Torso height ratio: larger = more upright
torso_height = abs(hip_mid_y - shoulder_mid_y)
shoulder_to_hip_ratio = torso_height / max(1, shoulder_width)
# Ideal ratio ~1.3-1.8 for upright seated posture
ratio_score = min(100.0, (shoulder_to_hip_ratio / 1.5) * 100.0)
posture_score = (symmetry_score * 0.5) + (ratio_score * 0.5)
posture_score = round(max(0.0, min(100.0, posture_score)), 1)
if posture_score >= 75:
label = "upright"
elif posture_score >= 50:
label = "leaning"
else:
label = "slouched"
return posture_score, label
def _score_hand_activity(self, kpts: list) -> tuple[float, str]:
"""
Score hand movement and classify as natural/excessive/minimal.
Uses wrist velocity relative to body width.
"""
now = time.time()
dt = now - self._prev_wrist_time
self._prev_wrist_time = now
if dt < 1e-6:
return 50.0, "natural"
velocities: list[float] = []
shoulder_width = 1.0
left_shoulder = self._safe_keypoint(kpts, KP_LEFT_SHOULDER)
right_shoulder = self._safe_keypoint(kpts, KP_RIGHT_SHOULDER)
if left_shoulder and right_shoulder:
shoulder_width = max(
1.0, abs(right_shoulder[0] - left_shoulder[0])
)
for side, kp_idx in [("left", KP_LEFT_WRIST), ("right", KP_RIGHT_WRIST)]:
wrist = self._safe_keypoint(kpts, kp_idx)
if wrist:
prev = self._prev_wrist_positions.get(side)
if prev:
dx = wrist[0] - prev[0]
dy = wrist[1] - prev[1]
velocity = math.sqrt(dx**2 + dy**2) / (shoulder_width * dt)
velocities.append(velocity)
self._prev_wrist_positions[side] = wrist
if not velocities:
return 50.0, "natural"
avg_velocity = sum(velocities) / len(velocities)
self._wrist_velocity_history.append(avg_velocity)
if len(self._wrist_velocity_history) > self._history_len:
self._wrist_velocity_history.pop(0)
smoothed = sum(self._wrist_velocity_history) / len(self._wrist_velocity_history)
# Classify velocity ranges (normalized to shoulder width per second)
if smoothed < 0.3:
return smoothed * 100, "minimal"
elif smoothed > 2.5:
return min(100.0, smoothed * 20), "excessive"
else:
# Map 0.3-2.5 to "natural" range, score 60-90
score = 60.0 + ((smoothed - 0.3) / 2.2) * 30.0
return round(score, 1), "natural"
def _smooth_score(self, history: list[float], new_val: float) -> float:
"""Exponential smoothing for score stability."""
history.append(new_val)
if len(history) > self._history_len:
history.pop(0)
if not history:
return new_val
# Weighted average (recent values weighted more)
n = len(history)
weights = [i + 1 for i in range(n)]
return sum(v * w for v, w in zip(history, weights)) / sum(weights)
def _process_person_keypoints(self, kpts: list) -> dict[str, Any]:
"""Compute all body language metrics for one detected person."""
eye_contact_raw = self._score_eye_contact(kpts)
posture_raw, posture_label = self._score_posture(kpts)
hand_score, hand_label = self._score_hand_activity(kpts)
eye_contact = self._smooth_score(self._eye_contact_history, eye_contact_raw)
posture_score = self._smooth_score(self._posture_history, posture_raw)
# Composite confidence index
confidence_raw = (eye_contact * 0.4) + (posture_score * 0.4) + (hand_score * 0.2)
confidence = self._smooth_score(self._confidence_history, confidence_raw)
# Engagement score (eye contact + energy signals)
activity_bonus = 10.0 if hand_label == "natural" else 0.0
engagement_raw = (eye_contact * 0.6) + (posture_score * 0.3) + activity_bonus
engagement = self._smooth_score(self._engagement_history, engagement_raw)
# Body language rating
avg_score = (confidence + engagement) / 2.0
if avg_score >= 80:
rating = "excellent"
elif avg_score >= 60:
rating = "good"
elif avg_score >= 40:
rating = "fair"
else:
rating = "poor"
# Eye contact label
if eye_contact >= 70:
eye_label = "strong"
elif eye_contact >= 45:
eye_label = "intermittent"
else:
eye_label = "weak"
return {
"confidence_index": round(confidence, 1),
"engagement_score": round(engagement, 1),
"body_language_rating": rating,
"posture": posture_label,
"eye_contact": eye_label,
"hand_activity": hand_label,
"eye_contact_score": round(eye_contact, 1),
"posture_score": round(posture_score, 1),
"hand_activity_score": round(hand_score, 1),
}
def get_state(self) -> dict:
"""
Returns body language state dict injected into LLM context each turn.
Falls back to cached state if no new pose data available.
"""
return self._latest_state.copy()
def _process_pose_sync(self, frame_array) -> tuple:
"""
Override parent to intercept pose_data and compute body language metrics.
"""
annotated_frame, pose_data = super()._process_pose_sync(frame_array)
persons = pose_data.get("persons", [])
if persons:
# Analyze first detected person (primary speaker)
kpts = persons[0].get("keypoints", [])
if kpts:
body_language = self._process_person_keypoints(kpts)
body_language["persons_detected"] = len(persons)
self._latest_state = body_language
logger.debug(f"Body language: {body_language}")
else:
self._latest_state["persons_detected"] = 0
return annotated_frame, pose_data
-
speech_processor.py: A lightweight state object that continuously accepts STTPartialTranscriptEvent strings from Deepgram and computes running metrics (like the hesitation_index based on gaps between words).
"""
Speech Processing Pipeline
Measures WPM, filler words, pause duration, and hesitation index from Deepgram transcripts.
"""
import re
import time
import logging
from collections import deque
from dataclasses import dataclass, field
from typing import Deque, Optional
logger = logging.getLogger(__name__)
# Filler words to detect
FILLER_PATTERNS = re.compile(
r"\b(um+|uh+|er+|ah+|like|you know|you know what i mean|"
r"basically|literally|actually|honestly|right\?|so|anyway|"
r"i mean|kind of|sort of|i guess)\b",
re.IGNORECASE,
)
# Ideal WPM range for most communication contexts
IDEAL_WPM_MIN = 120
IDEAL_WPM_MAX = 160
@dataclass
class TranscriptWindow:
"""Stores a rolling 30-second window of transcript data."""
text: str = ""
word_count: int = 0
filler_count: int = 0
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
pause_durations_ms: list[float] = field(default_factory=list)
class SpeechProcessor:
"""
Measures real-time speech quality metrics from streaming transcripts.
Designed to be used alongside the Deepgram STT plugin. Subscribe to
STT transcript events and call `on_transcript` for each utterance.
Exposes `get_state()` for the vision-agent processor state injection.
"""
def __init__(self, window_seconds: int = 30):
self.window_seconds = window_seconds
self._current_window = TranscriptWindow()
self._completed_windows: Deque[TranscriptWindow] = deque(maxlen=100)
# Track all-session cumulative stats
self._session_start = time.time()
self._total_words = 0
self._total_fillers = 0
self._last_speech_time: Optional[float] = None
self._pause_threshold_ms = 500 # pauses > 500ms are "notable"
def on_transcript(self, text: str, is_final: bool = True) -> None:
"""
Process a new transcript segment from Deepgram.
Args:
text: The transcript text for this utterance.
is_final: Whether this is a final (vs partial) transcript.
"""
if not text or not text.strip():
return
now = time.time()
# Calculate pause since last speech
if self._last_speech_time is not None:
pause_ms = (now - self._last_speech_time) * 1000
if pause_ms > self._pause_threshold_ms:
self._current_window.pause_durations_ms.append(pause_ms)
self._last_speech_time = now
# Only count final transcripts for window metrics
if not is_final:
return
words = text.strip().split()
word_count = len(words)
fillers = FILLER_PATTERNS.findall(text)
filler_count = len(fillers)
# Update current window
self._current_window.text += " " + text
self._current_window.word_count += word_count
self._current_window.filler_count += filler_count
# Update cumulative session stats
self._total_words += word_count
self._total_fillers += filler_count
logger.debug(
f"SpeechProcessor: +{word_count} words, {filler_count} fillers detected"
)
# Rotate window if expired
if (now - self._current_window.start_time) >= self.window_seconds:
self._rotate_window(now)
def _rotate_window(self, now: float) -> None:
"""Archive the current window and start a fresh one."""
self._current_window.end_time = now
self._completed_windows.append(self._current_window)
self._current_window = TranscriptWindow(start_time=now)
logger.info("SpeechProcessor: 30s window rotated.")
def get_current_wpm(self) -> float:
"""Calculate WPM for the active window."""
elapsed = time.time() - self._current_window.start_time
if elapsed < 1.0 and self._current_window.word_count == 0 and self._completed_windows:
# Fallback to the last completed window if we just rotated
last_window = self._completed_windows[-1]
if last_window.end_time:
last_elapsed = last_window.end_time - last_window.start_time
if last_elapsed >= 1.0:
return (last_window.word_count / last_elapsed) * 60.0
if elapsed < 1.0:
return 0.0
return (self._current_window.word_count / elapsed) * 60.0
def get_session_wpm(self) -> float:
"""Calculate overall session WPM."""
elapsed = time.time() - self._session_start
if elapsed < 1.0:
return 0.0
return (self._total_words / elapsed) * 60.0
def get_avg_pause_duration_ms(self) -> float:
"""Average pause duration in current window (ms)."""
pauses = self._current_window.pause_durations_ms
if not pauses and self._completed_windows and (time.time() - self._current_window.start_time) < 1.0:
pauses = self._completed_windows[-1].pause_durations_ms
if not pauses:
return 0.0
return sum(pauses) / len(pauses)
def get_hesitation_index(self) -> float:
"""
Hesitation index (0-100). Combines filler rate and pause duration.
Higher = more hesitation.
"""
window = self._current_window
actual_elapsed = time.time() - window.start_time
filler_count = window.filler_count
# If the window just rotated, add the last completed window for calculations
elapsed = actual_elapsed
if actual_elapsed < 1.0 and self._completed_windows:
last_window = self._completed_windows[-1]
if last_window.end_time:
elapsed += max(1.0, last_window.end_time - last_window.start_time)
filler_count += last_window.filler_count
elapsed = max(1.0, elapsed)
# Filler rate component: normalize fillers per minute against 0-10 range
fillers_per_min = (filler_count / elapsed) * 60.0
filler_component = min(fillers_per_min / 10.0, 1.0) * 50.0 # 0-50 points
# Pause component: normalize avg pause 0-3000ms range
avg_pause = self.get_avg_pause_duration_ms()
pause_component = min(avg_pause / 3000.0, 1.0) * 50.0 # 0-50 points
return round(filler_component + pause_component, 1)
def get_state(self) -> dict:
"""
Returns the current speech metrics dict for injection into LLM context.
Called automatically by the vision-agents framework each turn.
"""
wpm = self.get_current_wpm()
filler_count = self._current_window.filler_count
avg_pause = self.get_avg_pause_duration_ms()
hesitation = self.get_hesitation_index()
elapsed = round(time.time() - self._current_window.start_time, 1)
window_text = self._current_window.text.strip()
return {
"wpm": round(wpm, 1),
"filler_count": filler_count,
"avg_pause_duration_ms": round(avg_pause, 1),
"hesitation_index": hesitation,
"window_elapsed_seconds": elapsed,
"window_text": window_text[:500] if window_text else "",
"session_wpm": round(self.get_session_wpm(), 1),
"session_total_words": self._total_words,
"session_total_fillers": self._total_fillers,
"wpm_status": (
"too_slow" if wpm < IDEAL_WPM_MIN and wpm > 0
else "too_fast" if wpm > IDEAL_WPM_MAX
else "ideal"
),
}
def get_window_text(self) -> str:
"""Get the transcript text from the current 30s window."""
return self._current_window.text.strip()
def reset_window(self) -> None:
"""Manually reset the current window (e.g., after analysis triggers)."""
now = time.time()
self._rotate_window(now)
def get_session_summary(self) -> dict:
"""Full session-level speech summary for the performance report."""
return {
"session_duration_seconds": round(time.time() - self._session_start, 1),
"total_words": self._total_words,
"total_fillers": self._total_fillers,
"average_wpm": round(self.get_session_wpm(), 1),
"filler_rate_per_minute": round(
(self._total_fillers / max(1, time.time() - self._session_start)) * 60, 2
),
"windows_completed": len(self._completed_windows),
}
Why this matters: The Vision Processor and the Speech Processor never talk directly to each other. They mind their own business, continuously updating their own internal speech_state and vision_state dictionaries, completely unaware of what the other is doing.
"You can have brilliant ideas, but if you can't communicate them, your ideas won't get you anywhere." — Lee Iacocca
- The Metronome and the Memory (
/session)
If the processors are the senses, the /session folder is the brain's working memory.
session_manager.py acts as the system's metronome. Every 30 seconds, it steps in, interrupts the orchestrator, and says, "Give me the current state."
"""
Session Manager
Maintains per-call state, tracks 30-second coaching snapshots, and aggregates scores.
"""
import asyncio
import json
import logging
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from uuid import uuid4
logger = logging.getLogger(__name__)
REPORTS_DIR = Path("reports")
@dataclass
class CoachingSnapshot:
"""A single 30-second analysis snapshot."""
timestamp: float
elapsed_seconds: float
speech_state: dict
language_scores: dict
vision_state: dict
coaching_message: str = ""
def overall_score(self) -> float:
"""Compute weighted overall communication score."""
clarity = self.language_scores.get("clarity_score", 70)
structure = self.language_scores.get("structure_score", 70)
confidence_lang = self.language_scores.get("confidence_score", 70)
confidence_body = self.vision_state.get("confidence_index", 70)
engagement = self.vision_state.get("engagement_score", 70)
speech_penalty = min(30.0, self.speech_state.get("hesitation_index", 0) * 0.3)
raw = (
clarity * 0.20
+ structure * 0.20
+ confidence_lang * 0.15
+ confidence_body * 0.20
+ engagement * 0.25
) - speech_penalty
return round(max(0.0, min(100.0, raw)), 1)
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"elapsed_seconds": self.elapsed_seconds,
"overall_score": self.overall_score(),
"speech": self.speech_state,
"language": self.language_scores,
"vision": self.vision_state,
"coaching_message": self.coaching_message,
}
class SessionManager:
"""
Manages the coaching session lifecycle:
- Stores timestamped snapshots every 30 seconds
- Exposes aggregated score trends for the dashboard
- Triggers session report generation on call end
"""
def __init__(self, session_id: Optional[str] = None, mode: str = "interview"):
self.session_id = session_id or str(uuid4())
self.mode = mode
self.start_time = time.time()
self._snapshots: list[CoachingSnapshot] = []
self._analysis_interval = 30.0 # seconds
self._analysis_task: Optional[asyncio.Task] = None
self._on_analysis_callbacks: list = []
logger.info(f"SessionManager created: session_id={self.session_id}, mode={mode}")
def register_analysis_callback(self, callback) -> None:
"""
Register a coroutine to call every 30 seconds.
Callback signature: async def callback() -> None
"""
self._on_analysis_callbacks.append(callback)
def start_analysis_loop(self) -> None:
"""Start the periodic 30-second analysis cycle."""
self._analysis_task = asyncio.create_task(self._analysis_loop())
logger.info("Analysis loop started (30s intervals)")
async def _analysis_loop(self) -> None:
"""Fires registered callbacks every 30 seconds."""
try:
while True:
await asyncio.sleep(self._analysis_interval)
logger.info(
f"[{self.session_id}] 30s analysis cycle firing "
f"(elapsed: {self.elapsed_seconds():.0f}s)"
)
for callback in self._on_analysis_callbacks:
try:
await callback()
except Exception as e:
logger.error(f"Analysis callback error: {e}")
except asyncio.CancelledError:
logger.info("Analysis loop cancelled")
def record_snapshot(
self,
speech_state: dict,
language_scores: dict,
vision_state: dict,
coaching_message: str = "",
) -> CoachingSnapshot:
"""
Store a coaching snapshot for the current window.
Returns the snapshot.
"""
snapshot = CoachingSnapshot(
timestamp=time.time(),
elapsed_seconds=self.elapsed_seconds(),
speech_state=speech_state.copy(),
language_scores=language_scores.copy(),
vision_state=vision_state.copy(),
coaching_message=coaching_message,
)
self._snapshots.append(snapshot)
logger.info(
f"Snapshot #{len(self._snapshots)} recorded: "
f"overall={snapshot.overall_score()}"
)
return snapshot
def elapsed_seconds(self) -> float:
return round(time.time() - self.start_time, 1)
def get_score_trend(self) -> list[dict]:
"""Return list of {elapsed, overall_score} for trend charts."""
return [
{"elapsed": s.elapsed_seconds, "score": s.overall_score()}
for s in self._snapshots
]
def get_latest_scores(self) -> dict:
"""Return the most recent snapshot's scores, or defaults."""
if not self._snapshots:
return {
"overall_score": 70.0,
"speech_score": 70.0,
"body_language_score": 70.0,
"structure_score": 70.0,
"confidence_score": 70.0,
}
s = self._snapshots[-1]
return {
"overall_score": s.overall_score(),
"speech_score": max(0, 100 - s.speech_state.get("hesitation_index", 0)),
"body_language_score": s.vision_state.get("confidence_index", 70),
"structure_score": s.language_scores.get("structure_score", 70),
"confidence_score": s.language_scores.get("confidence_score", 70),
"clarity_score": s.language_scores.get("clarity_score", 70),
"engagement_score": s.vision_state.get("engagement_score", 70),
}
def get_leaderboard_entry(self) -> dict:
"""Compact entry for the GetStream leaderboard."""
scores = self.get_latest_scores()
return {
"session_id": self.session_id,
"mode": self.mode,
"elapsed_seconds": self.elapsed_seconds(),
"overall_score": scores["overall_score"],
"snapshot_count": len(self._snapshots),
}
async def stop(self) -> None:
"""Stop the analysis loop and return session data."""
if self._analysis_task:
self._analysis_task.cancel()
try:
await self._analysis_task
except asyncio.CancelledError:
pass
def get_all_snapshots(self) -> list[dict]:
return [s.to_dict() for s in self._snapshots]
It reaches into both /pipelines/speech_processor and /pipelines/vision_processor, pulls their isolated data dictionaries together, and uses that combined state to build the contextual prompt that triggers Gemini's voice via ElevenLabs.
Then, the Session Manager takes a "Snapshot" of that data and saves it. Once you hang up the video call, performance_report.py loops over all those 30-second snapshot files and generates the beautifully formatted Markdown summary that opens in your browser.
By strictly enforcing a separation of concerns—where the orchestrator only handles connections, the pipelines only handle raw math inference, and the session manager handles timing and data storage—we ensure that the YOLO object detection never blocks the WebRTC audio streaming.
What Happens When the Internet Drops?
If you've ever been on a Zoom call, you know that WebRTC connections drop. Audio gets garbled, video frames freeze, and packets are lost.
If our Python agent threw a fatal error and crashed every time the user's Wi-Fi hiccuped, the AI Coach would be completely useless in the real world.
To solve this, Visions is built around the concept of Graceful Degradation.
1. Resilient Analytical Engines
Because the speech_processor.py and vision_processor.py scripts are completely decoupled, they are inherently fault-tolerant.
If the user's video feed freezes due to bad bandwidth, YOLO in our Vision Processor simply stops receiving new frames. It doesn't crash the server. It just holds onto the last known vision_state (e.g., "posture: upright").
Meanwhile, if the audio connection is still surviving, deepgram STT keeps transcribing text, and the Speech Processor keeps counting words. When the 30-second loop triggers, the AI Coach can still provide spoken feedback based on the audio, even if the video feed is temporarily dead.
2. The Unbreakable "Try/Except" Session Wrapper
But what happens if the connection completely drops and the user's browser tab crashes? Do they lose their entire 30-minute coaching session data?
No. If you look at the bottom of the main event loop in performance_agent.py, the entire live call is wrapped inside a massive try/except block:
try:
# Wait for the call to end (user hangs up or closes tab)
await agent.finish()
# ── Session ended cleanly: generate report ──────────────────
await _generate_and_deliver_report()
except BaseException as e:
# ── Session ended abruptly: still save the report ──────────
logger.warning(f"Session ended unexpectedly. Saving report anyway...")
try:
await _generate_and_deliver_report()
except Exception as report_err:
logger.error(f"Failed to generate report after unexpected session end: {report_err}")
raise
If GetStream WebRTC throws an unexpected disconnection error (BaseException), the orchestration immediately catches it.
Instead of letting all the collected data vanish into memory, the SessionManager forces the system to trigger _generate_and_deliver_report().
The Session Manager grabs every 30-second snapshot it successfully saved before the internet dropped, compiles them, and still generates the final Markdown Session Performance Report (<session_id>.md) in the /reports folder.
The Result: Even if your laptop battery dies in the middle of a mock interview, the AI coach will gracefully save everything you've done up to that exact second, format your scores, and have your feedback waiting on your hard drive when you reboot.
"True communication is a whole-body act. It took a multimodal AI to finally measure it that way."
What's Next?
Building a real-time multimodal stack used to require a massive engineering team. Today, with tools like Gemini Realtime, YOLO edge inference, and GetStream, a single developer can build an agent that genuinely "coaches" humans.
In the future, I plan to expand the Vision architecture to include micro-expression tracking for deeper emotional resonance, and fine-tune exactly how the language intelligence maps out rhetorical structures on the fly.







Top comments (0)