DEV Community

Cover image for How I Built a Video Memory Agent using AWS Bedrock and OpenSearch
Salam Shaik for AWS Community Builders

Posted on • Originally published at builder.aws.com

How I Built a Video Memory Agent using AWS Bedrock and OpenSearch

Hi everyone,

Searching on OTT platforms is often frustrating. While metadata-based search works for titles, it fails when you search for specific moments — like describing a scene or asking, ‘When does the hero cry?’ You simply can’t get that level of detail from metadata alone.

My solution was to build an agent that actually ‘watches’ the movie. By analyzing video frames and transcriptions to create a semantic memory, we can achieve far greater accuracy and unlock entirely new ways to interact with video content.

AWS Services I used:

  • Bedrock for Titan, Nova Premiere models, and for Agent creation

  • Open search service for storing the video data

  • AWS Transcribe Service

  • Lambda

  • S3 for storing raw video and video frames.

High-Level Data Flow:

Let me divide this article into different sections:

  • Building the Ingestion layer

  • Building the Memory layer

  • Building the Reasoning layer

  • Building the Interface layer

Building the Ingestion layer:

  • For this experiment, I downloaded the movie **The Night Of The Living Dead(1968), **which is an open-source movie

  • Upload the movie to an S3 bucket

  • Visit the AWS Transcribe service and create a transcription job

  • Give a name for the job and keep the General model only

  • Select the stored movie as input

  • Select .srt as output format

  • Click on next and keep the other options as they are, and create the job

  • Once the job is finished, you can see the output SRT file in the bucket you mentioned

  • Now that we have the movie file ready and the transcription ready.

  • Let’s analyze the video and store that information as JSON files in S3 for further processing

 import argparse
    import io
    import json
    import math
    import os
    from datetime import timedelta

    import boto3
    from botocore.exceptions import ClientError
    from moviepy import VideoFileClip
    from PIL import Image
    import srt


    # -----------------------------
    # S3 helpers
    # -----------------------------
    def parse_s3_uri(uri: str):
        """
        Parse an s3://bucket/key URI into (bucket, key).
        """
        if not uri.startswith("s3://"):
            raise ValueError(f"Invalid S3 URI: {uri}")
        without_scheme = uri[5:]
        parts = without_scheme.split("/", 1)
        if len(parts) != 2:
            raise ValueError(f"Invalid S3 URI (missing key): {uri}")
        bucket, key = parts
        return bucket, key


    def download_s3_object(s3_client, s3_uri: str, local_path: str):
        bucket, key = parse_s3_uri(s3_uri)
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        print(f"Downloading {s3_uri} -> {local_path}")
        s3_client.download_file(bucket, key, local_path)
        return local_path


    def upload_json_to_s3(s3_client, s3_uri_prefix: str, filename: str, data: dict):
        """
        Upload a JSON dict as a file under a given s3://bucket/prefix.
        """
        bucket, prefix = parse_s3_uri(s3_uri_prefix)
        key = prefix.rstrip("/") + "/" + filename
        body = json.dumps(data, ensure_ascii=False).encode("utf-8")
        print(f"Uploading scene doc -> s3://{bucket}/{key}")
        s3_client.put_object(Bucket=bucket, Key=key, Body=body)


    # -----------------------------
    # SRT helpers
    # -----------------------------
    def load_srt_subtitles(srt_path: str):
        """
        Load SRT file and return a list of srt.Subtitle objects.
        """
        with open(srt_path, "r", encoding="utf-8") as f:
            content = f.read()
        subtitles = list(srt.parse(content))
        return subtitles


    def get_text_for_range(subtitles, start_sec: float, end_sec: float) -> str:
        """
        Get concatenated subtitle text for [start_sec, end_sec) range.
        Includes any subtitle that overlaps this time range.
        """
        texts = []
        for sub in subtitles:
            sub_start = sub.start.total_seconds()
            sub_end = sub.end.total_seconds()
            # Overlap check
            if sub_end <= start_sec or sub_start >= end_sec:
                continue
            # Clean line breaks
            texts.append(sub.content.replace("\n", " "))
        return " ".join(texts).strip()


    # -----------------------------
    # Video frame extraction helpers
    # -----------------------------
    def extract_minute_frame_bytes(
        clip: VideoFileClip,
        minute_index: int,
        frames_per_minute: int = 5,
        image_format: str = "jpeg",
    ):
        """
        For a given minute index, extract `frames_per_minute` frames
        as raw image bytes (JPEG by default).
        Returns a list of bytes objects.
        """
        duration_sec = clip.duration
        start = minute_index * 60.0
        if start >= duration_sec:
            return []

        end = min(start + 60.0, duration_sec)
        window = end - start
        if window <= 0:
            return []

        # Sample timestamps evenly inside the minute window.
        # Use (frames_per_minute + 1) so we don't hit exact edges.
        step = window / (frames_per_minute + 1)
        timestamps = [start + step * (i + 1) for i in range(frames_per_minute)]

        images_bytes = []
        for t in timestamps:
            # Clip to duration just in case of rounding
            t = min(t, duration_sec - 0.01)
            frame = clip.get_frame(t)  # numpy array (H, W, 3)

            # Convert numpy array to JPEG bytes using Pillow
            pil_img = Image.fromarray(frame)
            buf = io.BytesIO()
            pil_img.save(buf, format=image_format.upper())
            buf.seek(0)
            images_bytes.append(buf.read())

        return images_bytes


    def format_timestamp(seconds: float) -> str:
        """
        Format seconds as HH:MM:SS (floor).
        """
        return str(timedelta(seconds=int(seconds)))


    # -----------------------------
    # Bedrock (Nova Premier) helper
    # -----------------------------
    def call_nova_premier(
        bedrock_client,
        model_id: str,
        scene_text: str,
        frame_bytes_list,
        minute_index: int,
    ):
        """
        Call Amazon Nova Premier via the Bedrock Converse API with:
          - scene_text (subtitles for this minute)
          - up to N images (frames) as bytes

        Returns a structured dict with:
          scene_summary, characters, locations, emotions,
          relationships, topics, visual_tags, important_events
        """

        system_prompt = (
            "You are a precise video scene analyst. "
            "You receive up to 5 frames from a one-minute video segment "
            "plus the dialogue/subtitles text for the same time range.\n"
            "Your task is to return a STRICT JSON object with this exact schema:\n\n"
            "{\n"
            '  \"scene_summary\": \"...\",\n'
            "  \"characters\": [\"...\"],\n"
            "  \"locations\": [\"...\"],\n"
            "  \"emotions\": [\"...\"],\n"
            "  \"relationships\": [\"...\"],\n"
            "  \"topics\": [\"...\"],\n"
            "  \"visual_tags\": [\"...\"],\n"
            "  \"important_events\": [\"...\"]\n"
            "}\n\n"
            "Rules:\n"
            "- Only output JSON, nothing else.\n"
            "- If a field is unknown, use an empty list [] or a short generic summary.\n"
            "- Keep lists reasonably short and focused."
        )

        user_text = (
            f"This is minute {minute_index} of the video.\n\n"
            f"Subtitles for this minute:\n{scene_text or '[No subtitles in this range]'}\n\n"
            "Use the attached frames and text together to analyze this specific one-minute scene.\n"
            "Return ONLY the JSON object as specified."
        )

        # Build message content: first text, then each image as a separate block
        content_blocks = [{"text": user_text}]

        for img_bytes in frame_bytes_list:
            content_blocks.append(
                {
                    "image": {
                        "format": "jpeg",
                        "source": {
                            "bytes": img_bytes
                        },
                    }
                }
            )

        messages = [
            {
                "role": "user",
                "content": content_blocks,
            }
        ]

        try:
            response = bedrock_client.converse(
                modelId=model_id,
                system=[{"text": system_prompt}],
                messages=messages,
                inferenceConfig={
                    "maxTokens": 512,
                    "temperature": 0.2,
                    "topP": 0.9,
                },
            )

            output_message = response["output"]["message"]
            raw_text = output_message["content"][0]["text"]

            # Sometimes models may wrap JSON with text; try to extract JSON substring
            raw_text = raw_text.strip()
            try:
                # Try direct parse first
                scene_info = json.loads(raw_text)
            except json.JSONDecodeError:
                # Fallback: find first '{' and last '}' and parse that
                start = raw_text.find("{")
                end = raw_text.rfind("}")
                if start != -1 and end != -1 and end > start:
                    json_str = raw_text[start : end + 1]
                    scene_info = json.loads(json_str)
                else:
                    raise

            # Ensure all expected keys exist
            default_scene = {
                "scene_summary": "",
                "characters": [],
                "locations": [],
                "emotions": [],
                "relationships": [],
                "topics": [],
                "visual_tags": [],
                "important_events": [],
            }
            default_scene.update(scene_info or {})
            return default_scene

        except Exception as e:
            print(f"[ERROR] Bedrock call or JSON parsing failed for minute {minute_index}: {e}")
            return {
                "scene_summary": "",
                "characters": [],
                "locations": [],
                "emotions": [],
                "relationships": [],
                "topics": [],
                "visual_tags": [],
                "important_events": [],
            }


    # -----------------------------
    # Main orchestration
    # -----------------------------
    def analyze_video_from_s3(
        video_s3_uri: str,
        srt_s3_uri: str,
        region: str,
        model_id: str,
        frames_per_minute: int,
        output_s3_prefix: str,
        video_id: str,
        episode_title: str = "",
        season: int | None = None,
        episode: int | None = None,
    ):
        """
        1. Download video and SRT from S3.
        2. Parse duration and subtitles.
        3. For each minute:
           - sample frames
           - gather subtitle text
           - call Nova Premier for structured analysis
           - build scene_doc and upload to S3
        """

        session = boto3.Session(region_name=region)
        s3_client = session.client("s3")
        bedrock_client = session.client("bedrock-runtime")

        # Local temp paths
        tmp_dir = "./tmp_video_analysis"
        os.makedirs(tmp_dir, exist_ok=True)
        video_path = os.path.join(tmp_dir, "video_input.mp4")
        srt_path = os.path.join(tmp_dir, "subtitles.srt")

        # Download from S3
        download_s3_object(s3_client, video_s3_uri, video_path)
        download_s3_object(s3_client, srt_s3_uri, srt_path)

        # Load subtitles
        subtitles = load_srt_subtitles(srt_path)

        # Load video and get duration
        clip = VideoFileClip(video_path)
        duration_sec = clip.duration
        num_minutes = math.ceil(duration_sec / 60.0)

        print(f"Video duration: {duration_sec:.2f} seconds (~{num_minutes} minutes)\n")

        try:
            for minute_index in range(num_minutes):
                start_sec = minute_index * 60.0
                end_sec = min((minute_index + 1) * 60.0, duration_sec)

                print(f"Processing minute {minute_index} [{start_sec:.1f}s - {end_sec:.1f}s]")

                # 1) Extract frames for this minute
                frames = extract_minute_frame_bytes(
                    clip,
                    minute_index,
                    frames_per_minute=frames_per_minute,
                    image_format="jpeg",
                )

                if not frames:
                    print(f"  No frames extracted for minute {minute_index}, skipping scene doc.")
                    continue

                # 2) Extract subtitles text for this minute
                scene_text = get_text_for_range(subtitles, start_sec, end_sec)

                # 3) Call Nova Premier for structured scene info
                scene_info = call_nova_premier(
                    bedrock_client=bedrock_client,
                    model_id=model_id,
                    scene_text=scene_text,
                    frame_bytes_list=frames,
                    minute_index=minute_index,
                )

                # 4) Build scene_doc
                scene_id = f"{video_id}_m{minute_index:04d}"
                scene_doc = {
                    "video_id": video_id,
                    "episode_title": episode_title,
                    "season": season,
                    "episode": episode,
                    "scene_id": scene_id,
                    "start_sec": start_sec,
                    "end_sec": end_sec,
                    "timestamp_label": f"{format_timestamp(start_sec)} - {format_timestamp(end_sec)}",
                    "transcript": scene_text,
                    "nova_scene_summary": scene_info.get("scene_summary", ""),
                    "characters": scene_info.get("characters", []),
                    "locations": scene_info.get("locations", []),
                    "emotions": scene_info.get("emotions", []),
                    "relationships": scene_info.get("relationships", []),
                    "topics": scene_info.get("topics", []),
                    "visual_tags": scene_info.get("visual_tags", []),
                    "important_events": scene_info.get("important_events", []),
                }

                # 5) Upload scene_doc JSON to S3
                filename = f"{scene_id}.json"
                upload_json_to_s3(s3_client, output_s3_prefix, filename, scene_doc)

                print(f"  Scene doc created: {scene_id}")

        finally:
            clip.close()


    def main():
        parser = argparse.ArgumentParser(
            description=(
                "Download a video from S3, sample frames per minute, "
                "combine with SRT text, analyze with Amazon Nova Premier, "
                "and upload per-minute scene docs to S3."
            )
        )
        parser.add_argument(
            "--video-s3",
            required=True,
            help="S3 URI of the video file. Example: s3://my-bucket/path/video.mp4",
        )
        parser.add_argument(
            "--srt-s3",
            required=True,
            help="S3 URI of the SRT subtitle file. Example: s3://my-bucket/path/video.srt",
        )
        parser.add_argument(
            "--output-s3-prefix",
            required=True,
            help=(
                "S3 URI prefix where scene docs will be stored. "
                "Example: s3://my-bucket/3netra/FRIENDS_S01E01"
            ),
        )
        parser.add_argument(
            "--video-id",
            required=True,
            help="Logical video ID (e.g., FRIENDS_S01E01). Used in scene_id and metadata.",
        )
        parser.add_argument(
            "--episode-title",
            default="",
            help="Optional episode title for metadata.",
        )
        parser.add_argument(
            "--season",
            type=int,
            default=None,
            help="Optional season number for metadata.",
        )
        parser.add_argument(
            "--episode",
            type=int,
            default=None,
            help="Optional episode number for metadata.",
        )
        parser.add_argument(
            "--region",
            default="us-east-1",
            help="AWS Region where Bedrock is available (default: us-east-1).",
        )
        parser.add_argument(
            "--model-id",
            default="amazon.nova-premier-v1:0",
            help="Bedrock model ID for Nova Premier (default: amazon.nova-premier-v1:0).",
        )
        parser.add_argument(
            "--frames-per-minute",
            type=int,
            default=5,
            help="How many frames to sample per minute window (default: 5).",
        )

        args = parser.parse_args()

        analyze_video_from_s3(
            video_s3_uri=args.video_s3,
            srt_s3_uri=args.srt_s3,
            region=args.region,
            model_id=args.model_id,
            frames_per_minute=args.frames_per_minute,
            output_s3_prefix=args.output_s3_prefix,
            video_id=args.video_id,
            episode_title=args.episode_title,
            season=args.season,
            episode=args.episode,
        )


    if __name__ == "__main__":
        main()
Enter fullscreen mode Exit fullscreen mode
  • This script will help in downloading the video from the S3 and extracting frames for every minute, and combines those frames with the dialogues in the SRT file within that limit and sends them to the Nova Premiere model for analyzing

  • Those JSON files will be dumped to the S3 bucket.

  • This ends the ingestion module. Let’s work on the memory module

Building the Memory layer:

  • Visit the Open Search service from the AWS console

  • Create a domain with 1AZ, single standby, and with a t3.large instance for the dev/test environment

  • Once the domain is up and running, visit the Opensearch dashboard and create an index with this schema in dev tools from the side menu

 PUT video_scenes
    {
      "settings": {
        "index": {
          "knn": true
        }
      },
      "mappings": {
        "properties": {
          "video_id":        { "type": "keyword" },
          "scene_id":        { "type": "keyword" },
          "episode_title":   { "type": "text" },
          "season":          { "type": "integer" },
          "episode":         { "type": "integer" },

          "start_sec":       { "type": "integer" },
          "end_sec":         { "type": "integer" },
          "timestamp_label": { "type": "keyword" },

          "transcript":       { "type": "text" },
          "nova_scene_summary": { "type": "text" },

          "characters":      { "type": "keyword" },
          "locations":       { "type": "keyword" },
          "emotions":        { "type": "keyword" },
          "relationships":   { "type": "text" },
          "topics":          { "type": "keyword" },
          "visual_tags":     { "type": "keyword" },
          "important_events":{ "type": "text" },

          "embedding": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
              "name": "hnsw",
              "space_type": "cosinesimil",
              "engine": "lucene"
            }
          }
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode
  • Use the script below to generate embeddings for the JSON files and dump them into this index
import argparse
    import json
    import os

    import boto3
    from botocore.exceptions import ClientError
    from opensearchpy import OpenSearch, RequestsHttpConnection
    from requests_aws4auth import AWS4Auth


    # -------------- S3 helpers --------------
    def parse_s3_uri(uri: str):
        if not uri.startswith("s3://"):
            raise ValueError(f"Invalid S3 URI: {uri}")
        without = uri[5:]
        parts = without.split("/", 1)
        if len(parts) != 2:
            raise ValueError(f"Invalid S3 URI (missing key/prefix): {uri}")
        return parts[0], parts[1]


    def list_s3_json_objects(s3_client, s3_prefix_uri: str):
        """
        List all JSON objects under the given s3://bucket/prefix
        (this is where your scene_docs are stored).
        """
        bucket, prefix = parse_s3_uri(s3_prefix_uri)
        paginator = s3_client.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=bucket, Prefix=prefix.rstrip("/") + "/"):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if key.lower().endswith(".json"):
                    yield bucket, key


    def get_s3_json(s3_client, bucket: str, key: str) -> dict:
        resp = s3_client.get_object(Bucket=bucket, Key=key)
        body = resp["Body"].read().decode("utf-8")
        return json.loads(body)


    # -------------- Bedrock (Titan Embeddings) --------------
    def get_titan_embedding(bedrock_client, model_id: str, text: str, dimensions: int = 1024):
        """
        Call Titan Text Embeddings v2 (or G1) to get an embedding vector.
        Docs: inputText + optional dimensions. :contentReference[oaicite:1]{index=1}
        """
        if not text:
            text = " "  # Titan requires non-empty string

        body = json.dumps(
            {
                "inputText": text
            }
        )

        resp = bedrock_client.invoke_model(
            modelId=model_id,
            body=body,
            contentType="application/json",
            accept="application/json",
        )
        resp_body = json.loads(resp["body"].read())
        # titan-embed-text-v2 response: {"embedding": [...], "inputTextTokenCount": ...}
        embedding = resp_body["embedding"]
        return embedding


    def build_embedding_text(scene_doc: dict) -> str:
        """
        Concatenate important fields into a single text for embeddings.
        """
        parts = []

        summary = scene_doc.get("nova_scene_summary") or scene_doc.get("scene_summary") or ""
        transcript = scene_doc.get("transcript", "")
        chars = ", ".join(scene_doc.get("characters", []))
        rels = "; ".join(scene_doc.get("relationships", []))
        topics = ", ".join(scene_doc.get("topics", []))
        emotions = ", ".join(scene_doc.get("emotions", []))
        visual_tags = ", ".join(scene_doc.get("visual_tags", []))

        if summary:
            parts.append("[Summary] " + summary)
        if transcript:
            parts.append("[Transcript] " + transcript)
        if chars:
            parts.append("[Characters] " + chars)
        if rels:
            parts.append("[Relationships] " + rels)
        if topics:
            parts.append("[Topics] " + topics)
        if emotions:
            parts.append("[Emotions] " + emotions)
        if visual_tags:
            parts.append("[Visual tags] " + visual_tags)

        return "\n".join(parts)


    # -------------- OpenSearch client --------------
    def create_opensearch_client(
        region: str,
        endpoint: str,
        service: str = "aoss",
        username: str | None = None,
        password: str | None = None,
    ):
        """
        Create an OpenSearch client.

        Authentication modes:
          - If `username` and `password` are provided, use HTTP Basic auth.
          - Otherwise, fall back to SigV4 (AWS) auth for Serverless or classic domains.

        service:
          - 'aoss' for OpenSearch Serverless
          - 'es'   for classic OpenSearch domains
        """
        host = endpoint.replace("https://", "").replace("http://", "")

        if username is not None and password is not None:
            # Use HTTP basic auth (username/password)
            http_auth = (username, password)
        else:
            # Fall back to AWS SigV4 auth
            session = boto3.Session(region_name=region)
            credentials = session.get_credentials()
            awsauth = AWS4Auth(
                credentials.access_key,
                credentials.secret_key,
                region,
                service,
                session_token=credentials.token,
            )
            http_auth = awsauth

        client = OpenSearch(
            hosts=[{"host": host, "port": 443}],
            http_auth=http_auth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
        )
        return client


    def index_scene_doc(os_client, index_name: str, doc: dict):
        """
        Index a single scene_doc into OpenSearch.
        """
        scene_id = doc["scene_id"]
        resp = os_client.index(index=index_name, id=scene_id, body=doc, refresh=False)
        return resp


    # -------------- Main indexing flow --------------
    def index_scenes_to_opensearch(
        scene_s3_prefix: str,
        region: str,
        embed_model_id: str,
        os_endpoint: str,
        os_index: str,
        os_service: str = "aoss",
        os_username: str | None = None,
        os_password: str | None = None,
        embedding_dim: int = 1024,
    ):
        session = boto3.Session(region_name=region)
        s3_client = session.client("s3")
        bedrock_client = session.client("bedrock-runtime")
        os_client = create_opensearch_client(
            region, os_endpoint, service=os_service, username=os_username, password=os_password
        )

        for bucket, key in list_s3_json_objects(s3_client, scene_s3_prefix):
            print(f"Processing s3://{bucket}/{key}")
            scene_doc = get_s3_json(s3_client, bucket, key)

            # Build text and embedding
            embed_text = build_embedding_text(scene_doc)
            embedding = get_titan_embedding(
                bedrock_client, embed_model_id, embed_text, dimensions=embedding_dim
            )

            # Attach embedding field
            scene_doc["embedding"] = embedding

            # Index into OpenSearch
            resp = index_scene_doc(os_client, os_index, scene_doc)
            result = resp.get("result", "unknown")
            print(f"  Indexed scene_id={scene_doc.get('scene_id')} result={result}")


    def main():
        parser = argparse.ArgumentParser(
            description=(
                "Index 3Netra scene docs from S3 into OpenSearch using Titan embeddings."
            )
        )
        parser.add_argument(
            "--scene-s3-prefix",
            required=True,
            help=(
                "S3 URI prefix where scene JSONs are stored. "
                "Example: s3://my-bucket/3netra/FRIENDS_S01E01"
            ),
        )
        parser.add_argument(
            "--region",
            default="us-east-1",
            help="AWS Region for Bedrock, S3, and OpenSearch (default: us-east-1).",
        )
        parser.add_argument(
            "--embed-model-id",
            default="amazon.titan-embed-text-v2:0",
            help="Titan embeddings model ID (default: amazon.titan-embed-text-v2:0).",
        )
        parser.add_argument(
            "--os-endpoint",
            required=True,
            help=(
                "OpenSearch HTTPS endpoint (no index). "
                "Example: https://abc123.us-east-1.aoss.amazonaws.com"
            ),
        )
        parser.add_argument(
            "--os-index",
            default="video_scenes",
            help="OpenSearch index name (default: video_scenes).",
        )
        parser.add_argument(
            "--os-service",
            default="aoss",
            help="SigV4 service name: 'aoss' for Serverless, 'es' for domains (default: aoss).",
        )
        parser.add_argument(
            "--os-username",
            help=(
                "OpenSearch basic auth username (optional). "
                "If not provided, the script will read `OS_USERNAME` or `OPENSEARCH_USERNAME` env vars."
            ),
        )
        parser.add_argument(
            "--os-password",
            help=(
                "OpenSearch basic auth password (optional). "
                "If not provided, the script will read `OS_PASSWORD` or `OPENSEARCH_PASSWORD` env vars."
            ),
        )
        parser.add_argument(
            "--embedding-dim",
            type=int,
            default=1024,
            help="Embedding dimension (must match index mapping, default: 1024).",
        )

        args = parser.parse_args()

        # Accept username/password from CLI args or environment variables.
        os_username = (
            args.os_username
            or os.environ.get("OS_USERNAME")
            or os.environ.get("OPENSEARCH_USERNAME")
        )
        os_password = (
            args.os_password
            or os.environ.get("OS_PASSWORD")
            or os.environ.get("OPENSEARCH_PASSWORD")
        )

        index_scenes_to_opensearch(
            scene_s3_prefix=args.scene_s3_prefix,
            region=args.region,
            embed_model_id=args.embed_model_id,
            os_endpoint=args.os_endpoint,
            os_index=args.os_index,
            os_service=args.os_service,
            os_username=os_username,
            os_password=os_password,
            embedding_dim=args.embedding_dim,
        )


    if __name__ == "__main__":
        main()
Enter fullscreen mode Exit fullscreen mode
  • You can see the data dumped through the Discover section or using the workbench from the side menu in the OpenSearch dashboard

Building the Reasoning Layer:

  • Create a lambda function with this code
import json
    import boto3
    import os
    from opensearchpy import OpenSearch, RequestsHttpConnection

    # --- Configuration ---
    # Store these in Lambda Environment Variables for security
    OPENSEARCH_HOST = ""
    OPENSEARCH_USER = ""
    OPENSEARCH_PASS = ""
    REGION = os.environ.get('AWS_REGION', 'us-west-2')

    # --- Clients ---
    bedrock_runtime = boto3.client('bedrock-runtime', region_name=REGION)

    os_client = OpenSearch(
        hosts=[{'host': OPENSEARCH_HOST, 'port': 443}],
        http_auth=(OPENSEARCH_USER, OPENSEARCH_PASS),
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )

    def get_embedding(text):
        """Generates vector embedding using Titan v2"""
        body = json.dumps({"inputText": text})
        response = bedrock_runtime.invoke_model(
            modelId="amazon.titan-embed-text-v2:0",
            body=body,
            accept="application/json",
            contentType="application/json"
        )
        response_body = json.loads(response['body'].read())
        return response_body['embedding']

    def search_opensearch(vector, video_id, k=5):
        """Performs k-NN search on OpenSearch"""
        query = {
            "size": k,
            "_source": ["timestamp_label", "nova_scene_summary", "characters", "emotions"],
            "query": {
                "knn": {
                    "embedding": {  # Ensure this field name matches your index mapping
                        "vector": vector,
                        "k": k
                    }
                }
            }
        }
        # In production, use an alias or specific index logic
        index_name = "video_scenes"
        response = os_client.search(index=index_name, body=query)
        return [hit['_source'] for hit in response['hits']['hits']]

    def parse_timestamp(ts_str):
        """Helper to convert timestamp to seconds for sorting"""
        try:
            start = ts_str.split('-')[0].strip()
            parts = list(map(int, start.split(':')))
            if len(parts) == 3: return parts[0]*3600 + parts[1]*60 + parts[2]
            if len(parts) == 2: return parts[0]*60 + parts[1]
            return 0
        except:
            return 0

    def lambda_handler(event, context):
        print(f"Received Event: {event}")

        # 1. Initialize Response Info
        # We must echo back the same identifiers Bedrock sent us
        action_group = event.get('actionGroup', '')
        api_path = event.get('apiPath')
        http_method = event.get('httpMethod')
        function_name = event.get('function') # Fallback for different agent types

        # 2. Extract Parameters
        # Bedrock sends parameters in a list: [{'name': 'query', 'value': '...'}, ...]
        params = {}
        if 'parameters' in event:
            for p in event['parameters']:
                params[p['name']] = p['value']

        # Also check 'requestBody' if parameters aren't found (common in POST requests)
        if not params and 'requestBody' in event:
            try:
                body_content = event['requestBody']['content']['application/json']['properties']
                for prop in body_content:
                    params[prop['name']] = prop['value']
            except:
                pass

        user_query = params.get('query', '')
        video_id = params.get('video_id', 'default_video')

        # 3. Validation
        if not user_query:
            result_text = "Error: No query provided in parameters."
        else:
            # 4. Perform Search (Your existing logic)
            try:
                print(f"Embedding query: {user_query}")
                vector = get_embedding(user_query)

                print(f"Searching OpenSearch for video: {video_id}")
                raw_hits = search_opensearch(vector, video_id)

                # Sort by timestamp
                sorted_hits = sorted(raw_hits, key=lambda x: parse_timestamp(x.get('timestamp_label', '0:00')))

                # Format Context
                result_text = "RELEVANT VIDEO SCENES (Chronological):\n"
                if not sorted_hits:
                    result_text += "No relevant scenes found in memory."
                else:
                    for hit in sorted_hits:
                        # Robust field access
                        time_lbl = hit.get('timestamp_label', 'Unknown Time')
                        summary = hit.get('nova_scene_summary', 'No summary')
                        emotions = hit.get('emotions', [])
                        if isinstance(emotions, list): emotions = ", ".join(emotions)

                        result_text += f"[Time: {time_lbl}] {summary} (Emotions: {emotions})\n"

            except Exception as e:
                print(f"Processing Error: {str(e)}")
                result_text = f"System Error during search: {str(e)}"

        # 5. Construct Response (Dynamic based on Input Type)
        response_body = {
            "application/json": {
                "body": result_text
            }
        }

        response = {
            "messageVersion": "1.0",
            "response": {
                "actionGroup": action_group,
                "responseBody": response_body
            }
        }

        # If it was an API Path call (OpenAPI), add these keys:
        if api_path:
            response['response']['apiPath'] = api_path
            response['response']['httpMethod'] = http_method
            response['response']['httpStatusCode'] = 200
        # If it was a Function call, add this key:
        elif function_name:
            response['response']['function'] = function_name

        print(f"Returning Response: {response}")
        return response
        """
        The Entry Point for the Bedrock Agent.
        Bedrock sends parameters inside 'parameters' or 'requestBody'.
        """
        print(f"Received Event: {event}")

        # 1. Parse Input from Bedrock Agent
        # Bedrock Agents structure events differently. We usually define an Action Group.
        # We will assume we extract 'query' and 'video_id' from the function parameters.

        agent_params = {}
        if 'parameters' in event:
            # Standard Agent format
            for param in event['parameters']:
                agent_params[param['name']] = param['value']

        user_query = agent_params.get('query', '')
        video_id = agent_params.get('video_id', 'default_video') # Fallback if needed

        if not user_query:
            return {
                "messageVersion": "1.0",
                "response": {
                    "actionGroup": event.get('actionGroup', ''),
                    "function": event.get('function', ''),
                    "functionResponse": {
                        "responseBody": {
                            "TEXT": {"body": "Error: No query provided."}
                        }
                    }
                }
            }

        # 2. Logic: Embed -> Search -> Sort
        try:
            vector = get_embedding(user_query)
            raw_hits = search_opensearch(vector, video_id)

            # Sort by timestamp (Layer 3 Logic)
            sorted_hits = sorted(raw_hits, key=lambda x: parse_timestamp(x.get('timestamp_label', '0:00')))

            # 3. Format Output (The Context String)
            context_str = "RELEVANT VIDEO SCENES:\n"
            for hit in sorted_hits:
                context_str += f"[Time: {hit.get('timestamp_label')}] {hit.get('nova_scene_summary')} (Emotions: {hit.get('emotions')})\n"

            # 4. Return to Bedrock Agent
            # The response structure MUST match what Bedrock expects
            response_body = {
                "TEXT": {
                    "body": context_str
                }
            }

            action_response = {
                "actionGroup": event['actionGroup'],
                "function": event['function'],
                "functionResponse": {
                    "responseBody": response_body
                }
            }

            return {
                "messageVersion": "1.0",
                "response": action_response
            }

        except Exception as e:
            print(f"Error: {e}")
            return {
                "messageVersion": "1.0",
                "response": {
                    "actionGroup": event.get('actionGroup', ''),
                    "function": event.get('function', ''),
                    "functionResponse": {
                        "responseBody": {
                            "TEXT": {"body": f"System Error: {str(e)}"}
                        }
                    }
                }
            }
Enter fullscreen mode Exit fullscreen mode
  • The code first acts as an adapter. It accepts the JSON event from the Bedrock Agent (which contains the user’s natural language query) and extracts the core question (e.g., “Why is he crying?”)

  • It converts the user’s text query into a vector embedding using the Titan Text v2 model.

  • It sends this vector to OpenSearch to find the top $k$ most semantically similar scenes. This finds the “right content” regardless of where it is in the video.

  • This is the most critical step for reasoning. The code takes the search results (which come back sorted by relevance score) and re-sorts them by timestamp.

  • Why: This reconstructs the narrative timeline. It ensures the LLM reads the “Cause” (Minute 10) before the “Effect” (Minute 50), preventing it from hallucinating a backwards story.

  • Finally, the code strips away complex JSON syntax and formats the data into a clean, human-readable text block.

  • It labels every scene [Time: MM:SS] so the LLM can cite its sources in the final answer.

Agent Creation:

  • Visit the Agents section from the Bedrock side panel

  • Click on the Create Agent button

  • Give a name and description for the agent

  • Select the model you want to use and give proper instructions for the model based on our use case

  • The instructions I gave for the model are
You are 3Netra, an expert Video Intelligence AI. 
    You have access to a tool called "search_video_memory" that retrieves scene details from the video.

    YOUR RULES:
    1. ALWAYS use the "search_video_memory" tool when the user asks about the video content.
    2. The tool returns a list of scenes with timestamps.
    3. Answer the user's question using ONLY that information.
    4. If the tool returns no relevant info, say "I cannot find that in the video memory."
    5. CITE TIMESTAMPS in your answer like this: [05:00].
Enter fullscreen mode Exit fullscreen mode
  • Save till this part and click on add action group.

  • Give a name for this, and select the Define with API Schemas, and select the lambda we previously created

  • In the action group schema, select Define via in-line Schema Editor

  • Paste this schema there
{
      "openapi": "3.0.0",
      "info": {
        "title": "Video Memory Search API",
        "version": "1.0.0",
        "description": "API for searching semantic events and scenes within a video."
      },
      "paths": {
        "/search_video_memory": {
          "post": {
            "summary": "Searches the video memory for relevant scenes based on a user query.",
            "description": "Use this function whenever the user asks a question about the content, plot, characters, or events in the video.",
            "operationId": "search_video_memory",
            "parameters": [
              {
                "name": "query",
                "in": "query",
                "description": "The natural language question or topic the user is asking about (e.g., 'Why is he crying?').",
                "required": true,
                "schema": {
                  "type": "string"
                }
              },
              {
                "name": "video_id",
                "in": "query",
                "description": "The identifier of the video (e.g., 'movie_1'). Default to 'current_video' if not specified.",
                "required": true,
                "schema": {
                  "type": "string"
                }
              }
            ],
            "responses": {
              "200": {
                "description": "Successfully retrieved scene context",
                "content": {
                  "application/json": {
                    "schema": {
                      "type": "object",
                      "properties": {
                        "body": {
                          "type": "string",
                          "description": "The text containing relevant scene summaries and timestamps."
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode
  • Save and Exit. Then, prepare the model with these settings. On the right side, you can test your model with your inputs.

  • Make sure you have proper permission to access Lambda

  • If there are any errors, follow the trace to resolve the issue

  • Once you confirm the agent is working fine, create an alias to use it at the interface layer

Building the Interface layer:

  • For building the interface layer, we are gonna use the Streamlit library. Install it through PIP

  • Copy the Alias code and Agent code to use in this script

 import streamlit as st
    import boto3
    import uuid
    import json

    # --- CONFIGURATION ---
    # Replace these with your actual IDs from Step 1
    AGENT_ID = ""      # e.g., "X7W3J9..."
    AGENT_ALIAS_ID = "" # e.g., "TSTALIAS..."
    SESSION_ID = str(uuid.uuid4())           # Unique ID for this chat session
    REGION = "us-west-2"                     # Ensure this matches your Agent's region

    # --- CLIENT SETUP ---
    client = boto3.client("bedrock-agent-runtime", region_name=REGION)

    st.set_page_config(page_title="3Netra Video Agent", layout="wide")
    st.title("👁️ 3Netra: Video Memory Agent")

    # --- SESSION STATE (Memory) ---
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # --- UI LOGIC ---
    # 1. Display Chat History
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # 2. Handle User Input
    if prompt := st.chat_input("Ask about the video..."):
        # Add user message to history
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # 3. Call Bedrock Agent
        with st.chat_message("assistant"):
            message_placeholder = st.empty()
            full_response = ""

            try:
                # The invoke_agent API is streaming (it comes back in chunks)
                response = client.invoke_agent(
                    agentId=AGENT_ID,
                    agentAliasId=AGENT_ALIAS_ID,
                    sessionId=SESSION_ID,
                    inputText=prompt
                )

                # Parse the event stream
                event_stream = response.get("completion")
                for event in event_stream:
                    if 'chunk' in event:
                        chunk = event['chunk']
                        if 'bytes' in chunk:
                            text_chunk = chunk['bytes'].decode('utf-8')
                            full_response += text_chunk
                            message_placeholder.markdown(full_response + "▌")

                message_placeholder.markdown(full_response)

                # Add assistant response to history
                st.session_state.messages.append({"role": "assistant", "content": full_response})

            except Exception as e:
                st.error(f"Error invoking agent: {e}")

Enter fullscreen mode Exit fullscreen mode
  • Once the script is ready. Run the script using this command

    streamlit run app.py

  • It will open the browser with the chat interface. These are the questions I asked. You can see the response from the model

  • The response is not bad. We can refine it more by making more changes in the ingestion layer to get more accurate results.

That’s it. We built an agent that can respond based on the video memory. It has a lot of other applications also. Will explore more in the upcoming articles.

Thanks. Feel free to drop comments and suggestions.

Top comments (0)