Hi everyone,
Searching on OTT platforms is often frustrating. While metadata-based search works for titles, it fails when you search for specific moments — like describing a scene or asking, ‘When does the hero cry?’ You simply can’t get that level of detail from metadata alone.
My solution was to build an agent that actually ‘watches’ the movie. By analyzing video frames and transcriptions to create a semantic memory, we can achieve far greater accuracy and unlock entirely new ways to interact with video content.
AWS Services I used:
Bedrock for Titan, Nova Premiere models, and for Agent creation
Open search service for storing the video data
AWS Transcribe Service
Lambda
S3 for storing raw video and video frames.
High-Level Data Flow:
Let me divide this article into different sections:
Building the Ingestion layer
Building the Memory layer
Building the Reasoning layer
Building the Interface layer
Building the Ingestion layer:
For this experiment, I downloaded the movie **The Night Of The Living Dead(1968), **which is an open-source movie
Upload the movie to an S3 bucket
Visit the AWS Transcribe service and create a transcription job
Give a name for the job and keep the General model only
Select the stored movie as input
Select .srt as output format
Click on next and keep the other options as they are, and create the job
Once the job is finished, you can see the output SRT file in the bucket you mentioned
Now that we have the movie file ready and the transcription ready.
Let’s analyze the video and store that information as JSON files in S3 for further processing
import argparse
import io
import json
import math
import os
from datetime import timedelta
import boto3
from botocore.exceptions import ClientError
from moviepy import VideoFileClip
from PIL import Image
import srt
# -----------------------------
# S3 helpers
# -----------------------------
def parse_s3_uri(uri: str):
"""
Parse an s3://bucket/key URI into (bucket, key).
"""
if not uri.startswith("s3://"):
raise ValueError(f"Invalid S3 URI: {uri}")
without_scheme = uri[5:]
parts = without_scheme.split("/", 1)
if len(parts) != 2:
raise ValueError(f"Invalid S3 URI (missing key): {uri}")
bucket, key = parts
return bucket, key
def download_s3_object(s3_client, s3_uri: str, local_path: str):
bucket, key = parse_s3_uri(s3_uri)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
print(f"Downloading {s3_uri} -> {local_path}")
s3_client.download_file(bucket, key, local_path)
return local_path
def upload_json_to_s3(s3_client, s3_uri_prefix: str, filename: str, data: dict):
"""
Upload a JSON dict as a file under a given s3://bucket/prefix.
"""
bucket, prefix = parse_s3_uri(s3_uri_prefix)
key = prefix.rstrip("/") + "/" + filename
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
print(f"Uploading scene doc -> s3://{bucket}/{key}")
s3_client.put_object(Bucket=bucket, Key=key, Body=body)
# -----------------------------
# SRT helpers
# -----------------------------
def load_srt_subtitles(srt_path: str):
"""
Load SRT file and return a list of srt.Subtitle objects.
"""
with open(srt_path, "r", encoding="utf-8") as f:
content = f.read()
subtitles = list(srt.parse(content))
return subtitles
def get_text_for_range(subtitles, start_sec: float, end_sec: float) -> str:
"""
Get concatenated subtitle text for [start_sec, end_sec) range.
Includes any subtitle that overlaps this time range.
"""
texts = []
for sub in subtitles:
sub_start = sub.start.total_seconds()
sub_end = sub.end.total_seconds()
# Overlap check
if sub_end <= start_sec or sub_start >= end_sec:
continue
# Clean line breaks
texts.append(sub.content.replace("\n", " "))
return " ".join(texts).strip()
# -----------------------------
# Video frame extraction helpers
# -----------------------------
def extract_minute_frame_bytes(
clip: VideoFileClip,
minute_index: int,
frames_per_minute: int = 5,
image_format: str = "jpeg",
):
"""
For a given minute index, extract `frames_per_minute` frames
as raw image bytes (JPEG by default).
Returns a list of bytes objects.
"""
duration_sec = clip.duration
start = minute_index * 60.0
if start >= duration_sec:
return []
end = min(start + 60.0, duration_sec)
window = end - start
if window <= 0:
return []
# Sample timestamps evenly inside the minute window.
# Use (frames_per_minute + 1) so we don't hit exact edges.
step = window / (frames_per_minute + 1)
timestamps = [start + step * (i + 1) for i in range(frames_per_minute)]
images_bytes = []
for t in timestamps:
# Clip to duration just in case of rounding
t = min(t, duration_sec - 0.01)
frame = clip.get_frame(t) # numpy array (H, W, 3)
# Convert numpy array to JPEG bytes using Pillow
pil_img = Image.fromarray(frame)
buf = io.BytesIO()
pil_img.save(buf, format=image_format.upper())
buf.seek(0)
images_bytes.append(buf.read())
return images_bytes
def format_timestamp(seconds: float) -> str:
"""
Format seconds as HH:MM:SS (floor).
"""
return str(timedelta(seconds=int(seconds)))
# -----------------------------
# Bedrock (Nova Premier) helper
# -----------------------------
def call_nova_premier(
bedrock_client,
model_id: str,
scene_text: str,
frame_bytes_list,
minute_index: int,
):
"""
Call Amazon Nova Premier via the Bedrock Converse API with:
- scene_text (subtitles for this minute)
- up to N images (frames) as bytes
Returns a structured dict with:
scene_summary, characters, locations, emotions,
relationships, topics, visual_tags, important_events
"""
system_prompt = (
"You are a precise video scene analyst. "
"You receive up to 5 frames from a one-minute video segment "
"plus the dialogue/subtitles text for the same time range.\n"
"Your task is to return a STRICT JSON object with this exact schema:\n\n"
"{\n"
' \"scene_summary\": \"...\",\n'
" \"characters\": [\"...\"],\n"
" \"locations\": [\"...\"],\n"
" \"emotions\": [\"...\"],\n"
" \"relationships\": [\"...\"],\n"
" \"topics\": [\"...\"],\n"
" \"visual_tags\": [\"...\"],\n"
" \"important_events\": [\"...\"]\n"
"}\n\n"
"Rules:\n"
"- Only output JSON, nothing else.\n"
"- If a field is unknown, use an empty list [] or a short generic summary.\n"
"- Keep lists reasonably short and focused."
)
user_text = (
f"This is minute {minute_index} of the video.\n\n"
f"Subtitles for this minute:\n{scene_text or '[No subtitles in this range]'}\n\n"
"Use the attached frames and text together to analyze this specific one-minute scene.\n"
"Return ONLY the JSON object as specified."
)
# Build message content: first text, then each image as a separate block
content_blocks = [{"text": user_text}]
for img_bytes in frame_bytes_list:
content_blocks.append(
{
"image": {
"format": "jpeg",
"source": {
"bytes": img_bytes
},
}
}
)
messages = [
{
"role": "user",
"content": content_blocks,
}
]
try:
response = bedrock_client.converse(
modelId=model_id,
system=[{"text": system_prompt}],
messages=messages,
inferenceConfig={
"maxTokens": 512,
"temperature": 0.2,
"topP": 0.9,
},
)
output_message = response["output"]["message"]
raw_text = output_message["content"][0]["text"]
# Sometimes models may wrap JSON with text; try to extract JSON substring
raw_text = raw_text.strip()
try:
# Try direct parse first
scene_info = json.loads(raw_text)
except json.JSONDecodeError:
# Fallback: find first '{' and last '}' and parse that
start = raw_text.find("{")
end = raw_text.rfind("}")
if start != -1 and end != -1 and end > start:
json_str = raw_text[start : end + 1]
scene_info = json.loads(json_str)
else:
raise
# Ensure all expected keys exist
default_scene = {
"scene_summary": "",
"characters": [],
"locations": [],
"emotions": [],
"relationships": [],
"topics": [],
"visual_tags": [],
"important_events": [],
}
default_scene.update(scene_info or {})
return default_scene
except Exception as e:
print(f"[ERROR] Bedrock call or JSON parsing failed for minute {minute_index}: {e}")
return {
"scene_summary": "",
"characters": [],
"locations": [],
"emotions": [],
"relationships": [],
"topics": [],
"visual_tags": [],
"important_events": [],
}
# -----------------------------
# Main orchestration
# -----------------------------
def analyze_video_from_s3(
video_s3_uri: str,
srt_s3_uri: str,
region: str,
model_id: str,
frames_per_minute: int,
output_s3_prefix: str,
video_id: str,
episode_title: str = "",
season: int | None = None,
episode: int | None = None,
):
"""
1. Download video and SRT from S3.
2. Parse duration and subtitles.
3. For each minute:
- sample frames
- gather subtitle text
- call Nova Premier for structured analysis
- build scene_doc and upload to S3
"""
session = boto3.Session(region_name=region)
s3_client = session.client("s3")
bedrock_client = session.client("bedrock-runtime")
# Local temp paths
tmp_dir = "./tmp_video_analysis"
os.makedirs(tmp_dir, exist_ok=True)
video_path = os.path.join(tmp_dir, "video_input.mp4")
srt_path = os.path.join(tmp_dir, "subtitles.srt")
# Download from S3
download_s3_object(s3_client, video_s3_uri, video_path)
download_s3_object(s3_client, srt_s3_uri, srt_path)
# Load subtitles
subtitles = load_srt_subtitles(srt_path)
# Load video and get duration
clip = VideoFileClip(video_path)
duration_sec = clip.duration
num_minutes = math.ceil(duration_sec / 60.0)
print(f"Video duration: {duration_sec:.2f} seconds (~{num_minutes} minutes)\n")
try:
for minute_index in range(num_minutes):
start_sec = minute_index * 60.0
end_sec = min((minute_index + 1) * 60.0, duration_sec)
print(f"Processing minute {minute_index} [{start_sec:.1f}s - {end_sec:.1f}s]")
# 1) Extract frames for this minute
frames = extract_minute_frame_bytes(
clip,
minute_index,
frames_per_minute=frames_per_minute,
image_format="jpeg",
)
if not frames:
print(f" No frames extracted for minute {minute_index}, skipping scene doc.")
continue
# 2) Extract subtitles text for this minute
scene_text = get_text_for_range(subtitles, start_sec, end_sec)
# 3) Call Nova Premier for structured scene info
scene_info = call_nova_premier(
bedrock_client=bedrock_client,
model_id=model_id,
scene_text=scene_text,
frame_bytes_list=frames,
minute_index=minute_index,
)
# 4) Build scene_doc
scene_id = f"{video_id}_m{minute_index:04d}"
scene_doc = {
"video_id": video_id,
"episode_title": episode_title,
"season": season,
"episode": episode,
"scene_id": scene_id,
"start_sec": start_sec,
"end_sec": end_sec,
"timestamp_label": f"{format_timestamp(start_sec)} - {format_timestamp(end_sec)}",
"transcript": scene_text,
"nova_scene_summary": scene_info.get("scene_summary", ""),
"characters": scene_info.get("characters", []),
"locations": scene_info.get("locations", []),
"emotions": scene_info.get("emotions", []),
"relationships": scene_info.get("relationships", []),
"topics": scene_info.get("topics", []),
"visual_tags": scene_info.get("visual_tags", []),
"important_events": scene_info.get("important_events", []),
}
# 5) Upload scene_doc JSON to S3
filename = f"{scene_id}.json"
upload_json_to_s3(s3_client, output_s3_prefix, filename, scene_doc)
print(f" Scene doc created: {scene_id}")
finally:
clip.close()
def main():
parser = argparse.ArgumentParser(
description=(
"Download a video from S3, sample frames per minute, "
"combine with SRT text, analyze with Amazon Nova Premier, "
"and upload per-minute scene docs to S3."
)
)
parser.add_argument(
"--video-s3",
required=True,
help="S3 URI of the video file. Example: s3://my-bucket/path/video.mp4",
)
parser.add_argument(
"--srt-s3",
required=True,
help="S3 URI of the SRT subtitle file. Example: s3://my-bucket/path/video.srt",
)
parser.add_argument(
"--output-s3-prefix",
required=True,
help=(
"S3 URI prefix where scene docs will be stored. "
"Example: s3://my-bucket/3netra/FRIENDS_S01E01"
),
)
parser.add_argument(
"--video-id",
required=True,
help="Logical video ID (e.g., FRIENDS_S01E01). Used in scene_id and metadata.",
)
parser.add_argument(
"--episode-title",
default="",
help="Optional episode title for metadata.",
)
parser.add_argument(
"--season",
type=int,
default=None,
help="Optional season number for metadata.",
)
parser.add_argument(
"--episode",
type=int,
default=None,
help="Optional episode number for metadata.",
)
parser.add_argument(
"--region",
default="us-east-1",
help="AWS Region where Bedrock is available (default: us-east-1).",
)
parser.add_argument(
"--model-id",
default="amazon.nova-premier-v1:0",
help="Bedrock model ID for Nova Premier (default: amazon.nova-premier-v1:0).",
)
parser.add_argument(
"--frames-per-minute",
type=int,
default=5,
help="How many frames to sample per minute window (default: 5).",
)
args = parser.parse_args()
analyze_video_from_s3(
video_s3_uri=args.video_s3,
srt_s3_uri=args.srt_s3,
region=args.region,
model_id=args.model_id,
frames_per_minute=args.frames_per_minute,
output_s3_prefix=args.output_s3_prefix,
video_id=args.video_id,
episode_title=args.episode_title,
season=args.season,
episode=args.episode,
)
if __name__ == "__main__":
main()
This script will help in downloading the video from the S3 and extracting frames for every minute, and combines those frames with the dialogues in the SRT file within that limit and sends them to the Nova Premiere model for analyzing
Those JSON files will be dumped to the S3 bucket.
This ends the ingestion module. Let’s work on the memory module
Building the Memory layer:
Visit the Open Search service from the AWS console
Create a domain with 1AZ, single standby, and with a t3.large instance for the dev/test environment
Once the domain is up and running, visit the Opensearch dashboard and create an index with this schema in dev tools from the side menu
PUT video_scenes
{
"settings": {
"index": {
"knn": true
}
},
"mappings": {
"properties": {
"video_id": { "type": "keyword" },
"scene_id": { "type": "keyword" },
"episode_title": { "type": "text" },
"season": { "type": "integer" },
"episode": { "type": "integer" },
"start_sec": { "type": "integer" },
"end_sec": { "type": "integer" },
"timestamp_label": { "type": "keyword" },
"transcript": { "type": "text" },
"nova_scene_summary": { "type": "text" },
"characters": { "type": "keyword" },
"locations": { "type": "keyword" },
"emotions": { "type": "keyword" },
"relationships": { "type": "text" },
"topics": { "type": "keyword" },
"visual_tags": { "type": "keyword" },
"important_events":{ "type": "text" },
"embedding": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "lucene"
}
}
}
}
}
- Use the script below to generate embeddings for the JSON files and dump them into this index
import argparse
import json
import os
import boto3
from botocore.exceptions import ClientError
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# -------------- S3 helpers --------------
def parse_s3_uri(uri: str):
if not uri.startswith("s3://"):
raise ValueError(f"Invalid S3 URI: {uri}")
without = uri[5:]
parts = without.split("/", 1)
if len(parts) != 2:
raise ValueError(f"Invalid S3 URI (missing key/prefix): {uri}")
return parts[0], parts[1]
def list_s3_json_objects(s3_client, s3_prefix_uri: str):
"""
List all JSON objects under the given s3://bucket/prefix
(this is where your scene_docs are stored).
"""
bucket, prefix = parse_s3_uri(s3_prefix_uri)
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix.rstrip("/") + "/"):
for obj in page.get("Contents", []):
key = obj["Key"]
if key.lower().endswith(".json"):
yield bucket, key
def get_s3_json(s3_client, bucket: str, key: str) -> dict:
resp = s3_client.get_object(Bucket=bucket, Key=key)
body = resp["Body"].read().decode("utf-8")
return json.loads(body)
# -------------- Bedrock (Titan Embeddings) --------------
def get_titan_embedding(bedrock_client, model_id: str, text: str, dimensions: int = 1024):
"""
Call Titan Text Embeddings v2 (or G1) to get an embedding vector.
Docs: inputText + optional dimensions. :contentReference[oaicite:1]{index=1}
"""
if not text:
text = " " # Titan requires non-empty string
body = json.dumps(
{
"inputText": text
}
)
resp = bedrock_client.invoke_model(
modelId=model_id,
body=body,
contentType="application/json",
accept="application/json",
)
resp_body = json.loads(resp["body"].read())
# titan-embed-text-v2 response: {"embedding": [...], "inputTextTokenCount": ...}
embedding = resp_body["embedding"]
return embedding
def build_embedding_text(scene_doc: dict) -> str:
"""
Concatenate important fields into a single text for embeddings.
"""
parts = []
summary = scene_doc.get("nova_scene_summary") or scene_doc.get("scene_summary") or ""
transcript = scene_doc.get("transcript", "")
chars = ", ".join(scene_doc.get("characters", []))
rels = "; ".join(scene_doc.get("relationships", []))
topics = ", ".join(scene_doc.get("topics", []))
emotions = ", ".join(scene_doc.get("emotions", []))
visual_tags = ", ".join(scene_doc.get("visual_tags", []))
if summary:
parts.append("[Summary] " + summary)
if transcript:
parts.append("[Transcript] " + transcript)
if chars:
parts.append("[Characters] " + chars)
if rels:
parts.append("[Relationships] " + rels)
if topics:
parts.append("[Topics] " + topics)
if emotions:
parts.append("[Emotions] " + emotions)
if visual_tags:
parts.append("[Visual tags] " + visual_tags)
return "\n".join(parts)
# -------------- OpenSearch client --------------
def create_opensearch_client(
region: str,
endpoint: str,
service: str = "aoss",
username: str | None = None,
password: str | None = None,
):
"""
Create an OpenSearch client.
Authentication modes:
- If `username` and `password` are provided, use HTTP Basic auth.
- Otherwise, fall back to SigV4 (AWS) auth for Serverless or classic domains.
service:
- 'aoss' for OpenSearch Serverless
- 'es' for classic OpenSearch domains
"""
host = endpoint.replace("https://", "").replace("http://", "")
if username is not None and password is not None:
# Use HTTP basic auth (username/password)
http_auth = (username, password)
else:
# Fall back to AWS SigV4 auth
session = boto3.Session(region_name=region)
credentials = session.get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)
http_auth = awsauth
client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=http_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
)
return client
def index_scene_doc(os_client, index_name: str, doc: dict):
"""
Index a single scene_doc into OpenSearch.
"""
scene_id = doc["scene_id"]
resp = os_client.index(index=index_name, id=scene_id, body=doc, refresh=False)
return resp
# -------------- Main indexing flow --------------
def index_scenes_to_opensearch(
scene_s3_prefix: str,
region: str,
embed_model_id: str,
os_endpoint: str,
os_index: str,
os_service: str = "aoss",
os_username: str | None = None,
os_password: str | None = None,
embedding_dim: int = 1024,
):
session = boto3.Session(region_name=region)
s3_client = session.client("s3")
bedrock_client = session.client("bedrock-runtime")
os_client = create_opensearch_client(
region, os_endpoint, service=os_service, username=os_username, password=os_password
)
for bucket, key in list_s3_json_objects(s3_client, scene_s3_prefix):
print(f"Processing s3://{bucket}/{key}")
scene_doc = get_s3_json(s3_client, bucket, key)
# Build text and embedding
embed_text = build_embedding_text(scene_doc)
embedding = get_titan_embedding(
bedrock_client, embed_model_id, embed_text, dimensions=embedding_dim
)
# Attach embedding field
scene_doc["embedding"] = embedding
# Index into OpenSearch
resp = index_scene_doc(os_client, os_index, scene_doc)
result = resp.get("result", "unknown")
print(f" Indexed scene_id={scene_doc.get('scene_id')} result={result}")
def main():
parser = argparse.ArgumentParser(
description=(
"Index 3Netra scene docs from S3 into OpenSearch using Titan embeddings."
)
)
parser.add_argument(
"--scene-s3-prefix",
required=True,
help=(
"S3 URI prefix where scene JSONs are stored. "
"Example: s3://my-bucket/3netra/FRIENDS_S01E01"
),
)
parser.add_argument(
"--region",
default="us-east-1",
help="AWS Region for Bedrock, S3, and OpenSearch (default: us-east-1).",
)
parser.add_argument(
"--embed-model-id",
default="amazon.titan-embed-text-v2:0",
help="Titan embeddings model ID (default: amazon.titan-embed-text-v2:0).",
)
parser.add_argument(
"--os-endpoint",
required=True,
help=(
"OpenSearch HTTPS endpoint (no index). "
"Example: https://abc123.us-east-1.aoss.amazonaws.com"
),
)
parser.add_argument(
"--os-index",
default="video_scenes",
help="OpenSearch index name (default: video_scenes).",
)
parser.add_argument(
"--os-service",
default="aoss",
help="SigV4 service name: 'aoss' for Serverless, 'es' for domains (default: aoss).",
)
parser.add_argument(
"--os-username",
help=(
"OpenSearch basic auth username (optional). "
"If not provided, the script will read `OS_USERNAME` or `OPENSEARCH_USERNAME` env vars."
),
)
parser.add_argument(
"--os-password",
help=(
"OpenSearch basic auth password (optional). "
"If not provided, the script will read `OS_PASSWORD` or `OPENSEARCH_PASSWORD` env vars."
),
)
parser.add_argument(
"--embedding-dim",
type=int,
default=1024,
help="Embedding dimension (must match index mapping, default: 1024).",
)
args = parser.parse_args()
# Accept username/password from CLI args or environment variables.
os_username = (
args.os_username
or os.environ.get("OS_USERNAME")
or os.environ.get("OPENSEARCH_USERNAME")
)
os_password = (
args.os_password
or os.environ.get("OS_PASSWORD")
or os.environ.get("OPENSEARCH_PASSWORD")
)
index_scenes_to_opensearch(
scene_s3_prefix=args.scene_s3_prefix,
region=args.region,
embed_model_id=args.embed_model_id,
os_endpoint=args.os_endpoint,
os_index=args.os_index,
os_service=args.os_service,
os_username=os_username,
os_password=os_password,
embedding_dim=args.embedding_dim,
)
if __name__ == "__main__":
main()
- You can see the data dumped through the Discover section or using the workbench from the side menu in the OpenSearch dashboard
Building the Reasoning Layer:
- Create a lambda function with this code
import json
import boto3
import os
from opensearchpy import OpenSearch, RequestsHttpConnection
# --- Configuration ---
# Store these in Lambda Environment Variables for security
OPENSEARCH_HOST = ""
OPENSEARCH_USER = ""
OPENSEARCH_PASS = ""
REGION = os.environ.get('AWS_REGION', 'us-west-2')
# --- Clients ---
bedrock_runtime = boto3.client('bedrock-runtime', region_name=REGION)
os_client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 443}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASS),
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
def get_embedding(text):
"""Generates vector embedding using Titan v2"""
body = json.dumps({"inputText": text})
response = bedrock_runtime.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=body,
accept="application/json",
contentType="application/json"
)
response_body = json.loads(response['body'].read())
return response_body['embedding']
def search_opensearch(vector, video_id, k=5):
"""Performs k-NN search on OpenSearch"""
query = {
"size": k,
"_source": ["timestamp_label", "nova_scene_summary", "characters", "emotions"],
"query": {
"knn": {
"embedding": { # Ensure this field name matches your index mapping
"vector": vector,
"k": k
}
}
}
}
# In production, use an alias or specific index logic
index_name = "video_scenes"
response = os_client.search(index=index_name, body=query)
return [hit['_source'] for hit in response['hits']['hits']]
def parse_timestamp(ts_str):
"""Helper to convert timestamp to seconds for sorting"""
try:
start = ts_str.split('-')[0].strip()
parts = list(map(int, start.split(':')))
if len(parts) == 3: return parts[0]*3600 + parts[1]*60 + parts[2]
if len(parts) == 2: return parts[0]*60 + parts[1]
return 0
except:
return 0
def lambda_handler(event, context):
print(f"Received Event: {event}")
# 1. Initialize Response Info
# We must echo back the same identifiers Bedrock sent us
action_group = event.get('actionGroup', '')
api_path = event.get('apiPath')
http_method = event.get('httpMethod')
function_name = event.get('function') # Fallback for different agent types
# 2. Extract Parameters
# Bedrock sends parameters in a list: [{'name': 'query', 'value': '...'}, ...]
params = {}
if 'parameters' in event:
for p in event['parameters']:
params[p['name']] = p['value']
# Also check 'requestBody' if parameters aren't found (common in POST requests)
if not params and 'requestBody' in event:
try:
body_content = event['requestBody']['content']['application/json']['properties']
for prop in body_content:
params[prop['name']] = prop['value']
except:
pass
user_query = params.get('query', '')
video_id = params.get('video_id', 'default_video')
# 3. Validation
if not user_query:
result_text = "Error: No query provided in parameters."
else:
# 4. Perform Search (Your existing logic)
try:
print(f"Embedding query: {user_query}")
vector = get_embedding(user_query)
print(f"Searching OpenSearch for video: {video_id}")
raw_hits = search_opensearch(vector, video_id)
# Sort by timestamp
sorted_hits = sorted(raw_hits, key=lambda x: parse_timestamp(x.get('timestamp_label', '0:00')))
# Format Context
result_text = "RELEVANT VIDEO SCENES (Chronological):\n"
if not sorted_hits:
result_text += "No relevant scenes found in memory."
else:
for hit in sorted_hits:
# Robust field access
time_lbl = hit.get('timestamp_label', 'Unknown Time')
summary = hit.get('nova_scene_summary', 'No summary')
emotions = hit.get('emotions', [])
if isinstance(emotions, list): emotions = ", ".join(emotions)
result_text += f"[Time: {time_lbl}] {summary} (Emotions: {emotions})\n"
except Exception as e:
print(f"Processing Error: {str(e)}")
result_text = f"System Error during search: {str(e)}"
# 5. Construct Response (Dynamic based on Input Type)
response_body = {
"application/json": {
"body": result_text
}
}
response = {
"messageVersion": "1.0",
"response": {
"actionGroup": action_group,
"responseBody": response_body
}
}
# If it was an API Path call (OpenAPI), add these keys:
if api_path:
response['response']['apiPath'] = api_path
response['response']['httpMethod'] = http_method
response['response']['httpStatusCode'] = 200
# If it was a Function call, add this key:
elif function_name:
response['response']['function'] = function_name
print(f"Returning Response: {response}")
return response
"""
The Entry Point for the Bedrock Agent.
Bedrock sends parameters inside 'parameters' or 'requestBody'.
"""
print(f"Received Event: {event}")
# 1. Parse Input from Bedrock Agent
# Bedrock Agents structure events differently. We usually define an Action Group.
# We will assume we extract 'query' and 'video_id' from the function parameters.
agent_params = {}
if 'parameters' in event:
# Standard Agent format
for param in event['parameters']:
agent_params[param['name']] = param['value']
user_query = agent_params.get('query', '')
video_id = agent_params.get('video_id', 'default_video') # Fallback if needed
if not user_query:
return {
"messageVersion": "1.0",
"response": {
"actionGroup": event.get('actionGroup', ''),
"function": event.get('function', ''),
"functionResponse": {
"responseBody": {
"TEXT": {"body": "Error: No query provided."}
}
}
}
}
# 2. Logic: Embed -> Search -> Sort
try:
vector = get_embedding(user_query)
raw_hits = search_opensearch(vector, video_id)
# Sort by timestamp (Layer 3 Logic)
sorted_hits = sorted(raw_hits, key=lambda x: parse_timestamp(x.get('timestamp_label', '0:00')))
# 3. Format Output (The Context String)
context_str = "RELEVANT VIDEO SCENES:\n"
for hit in sorted_hits:
context_str += f"[Time: {hit.get('timestamp_label')}] {hit.get('nova_scene_summary')} (Emotions: {hit.get('emotions')})\n"
# 4. Return to Bedrock Agent
# The response structure MUST match what Bedrock expects
response_body = {
"TEXT": {
"body": context_str
}
}
action_response = {
"actionGroup": event['actionGroup'],
"function": event['function'],
"functionResponse": {
"responseBody": response_body
}
}
return {
"messageVersion": "1.0",
"response": action_response
}
except Exception as e:
print(f"Error: {e}")
return {
"messageVersion": "1.0",
"response": {
"actionGroup": event.get('actionGroup', ''),
"function": event.get('function', ''),
"functionResponse": {
"responseBody": {
"TEXT": {"body": f"System Error: {str(e)}"}
}
}
}
}
The code first acts as an adapter. It accepts the JSON event from the Bedrock Agent (which contains the user’s natural language query) and extracts the core question (e.g., “Why is he crying?”)
It converts the user’s text query into a vector embedding using the Titan Text v2 model.
It sends this vector to OpenSearch to find the top $k$ most semantically similar scenes. This finds the “right content” regardless of where it is in the video.
This is the most critical step for reasoning. The code takes the search results (which come back sorted by relevance score) and re-sorts them by timestamp.
Why: This reconstructs the narrative timeline. It ensures the LLM reads the “Cause” (Minute 10) before the “Effect” (Minute 50), preventing it from hallucinating a backwards story.
Finally, the code strips away complex JSON syntax and formats the data into a clean, human-readable text block.
It labels every scene [Time: MM:SS] so the LLM can cite its sources in the final answer.
Agent Creation:
Visit the Agents section from the Bedrock side panel
Click on the Create Agent button
Give a name and description for the agent
Select the model you want to use and give proper instructions for the model based on our use case
- The instructions I gave for the model are
You are 3Netra, an expert Video Intelligence AI.
You have access to a tool called "search_video_memory" that retrieves scene details from the video.
YOUR RULES:
1. ALWAYS use the "search_video_memory" tool when the user asks about the video content.
2. The tool returns a list of scenes with timestamps.
3. Answer the user's question using ONLY that information.
4. If the tool returns no relevant info, say "I cannot find that in the video memory."
5. CITE TIMESTAMPS in your answer like this: [05:00].
Save till this part and click on add action group.
Give a name for this, and select the Define with API Schemas, and select the lambda we previously created
- In the action group schema, select Define via in-line Schema Editor
- Paste this schema there
{
"openapi": "3.0.0",
"info": {
"title": "Video Memory Search API",
"version": "1.0.0",
"description": "API for searching semantic events and scenes within a video."
},
"paths": {
"/search_video_memory": {
"post": {
"summary": "Searches the video memory for relevant scenes based on a user query.",
"description": "Use this function whenever the user asks a question about the content, plot, characters, or events in the video.",
"operationId": "search_video_memory",
"parameters": [
{
"name": "query",
"in": "query",
"description": "The natural language question or topic the user is asking about (e.g., 'Why is he crying?').",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "video_id",
"in": "query",
"description": "The identifier of the video (e.g., 'movie_1'). Default to 'current_video' if not specified.",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": "Successfully retrieved scene context",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"body": {
"type": "string",
"description": "The text containing relevant scene summaries and timestamps."
}
}
}
}
}
}
}
}
}
}
}
Save and Exit. Then, prepare the model with these settings. On the right side, you can test your model with your inputs.
Make sure you have proper permission to access Lambda
If there are any errors, follow the trace to resolve the issue
Once you confirm the agent is working fine, create an alias to use it at the interface layer
Building the Interface layer:
For building the interface layer, we are gonna use the Streamlit library. Install it through PIP
Copy the Alias code and Agent code to use in this script
import streamlit as st
import boto3
import uuid
import json
# --- CONFIGURATION ---
# Replace these with your actual IDs from Step 1
AGENT_ID = "" # e.g., "X7W3J9..."
AGENT_ALIAS_ID = "" # e.g., "TSTALIAS..."
SESSION_ID = str(uuid.uuid4()) # Unique ID for this chat session
REGION = "us-west-2" # Ensure this matches your Agent's region
# --- CLIENT SETUP ---
client = boto3.client("bedrock-agent-runtime", region_name=REGION)
st.set_page_config(page_title="3Netra Video Agent", layout="wide")
st.title("👁️ 3Netra: Video Memory Agent")
# --- SESSION STATE (Memory) ---
if "messages" not in st.session_state:
st.session_state.messages = []
# --- UI LOGIC ---
# 1. Display Chat History
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 2. Handle User Input
if prompt := st.chat_input("Ask about the video..."):
# Add user message to history
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# 3. Call Bedrock Agent
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
try:
# The invoke_agent API is streaming (it comes back in chunks)
response = client.invoke_agent(
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=SESSION_ID,
inputText=prompt
)
# Parse the event stream
event_stream = response.get("completion")
for event in event_stream:
if 'chunk' in event:
chunk = event['chunk']
if 'bytes' in chunk:
text_chunk = chunk['bytes'].decode('utf-8')
full_response += text_chunk
message_placeholder.markdown(full_response + "▌")
message_placeholder.markdown(full_response)
# Add assistant response to history
st.session_state.messages.append({"role": "assistant", "content": full_response})
except Exception as e:
st.error(f"Error invoking agent: {e}")
-
Once the script is ready. Run the script using this command
streamlit run app.py
It will open the browser with the chat interface. These are the questions I asked. You can see the response from the model
- The response is not bad. We can refine it more by making more changes in the ingestion layer to get more accurate results.
That’s it. We built an agent that can respond based on the video memory. It has a lot of other applications also. Will explore more in the upcoming articles.
Thanks. Feel free to drop comments and suggestions.







Top comments (0)