DEV Community

Cover image for Text-to-Clip: Building a Serverless AI Engine that Edits Video from Descriptions

Text-to-Clip: Building a Serverless AI Engine that Edits Video from Descriptions

Hi everyone,

Imagine typing: 'Show me the car chase scene' and within seconds getting a perfectly(almost perfectly πŸ˜…) cut video delivered to your screen
I call it Text-To-Clip. It's an automated video editing engine that combines computer vision(to see), Vector search(to remember), and FFmpeg(to act)

In this post, I am breaking down the exact AWS architecture I used to build it. From the distributed 'Map' states in the Step function to Dockerized computer vision containers running on Lambda

Services I used:

  • AWS Bedrock
  • Lambda
  • Step functions
  • Open search Service
  • Amazon RekognitionΒ 
  • S3

Architecture:

Implementation steps:
Identifying shots in the video: Whenever the user uploads any video, it will be sent for shots identification to the AWS Rekognition service, and a transcription job will start using the AWS Transcribe service

Analyzing shots: From each shot x number of frames will be picked along with transcription in that time frame. These will be sent to the Bedrock Nova model for scene analysis

Storing analyzed data: Model analyzed data will be sent to the embedding model, and the generated embeddings will be stored in the OpenSearch index

Querying and Generating video: User query will be converted to embedding and compared against shot embeddings. Returned embedding timestamps will be passed to FFMpeg to cut and stitch the video.

Load Balancing: When I uploaded a movie with 1hr 30 mins duration, More than 800+ shots were returned. It took more than 40 minutes to analyze each shot. So, a parallel processing mechanism was implemented using Step functions Service, where multiple lambdas will run with the distributed load at the same time.Β 

Let's start the implementation

Folder Structure:

Module Files:

  • The requirements file contains all the Python libraries
boto3
opencv-python-headless
numpy<2.0.0
opensearch-py
requests
Enter fullscreen mode Exit fullscreen mode

AWS config file for initiating clients

import boto3
import config

class AWSManager:
    def __init__(self):
        self.session = boto3.Session(region_name=config.AWS_REGION)
        self.s3_client = self.session.client('s3')
        self.rekognition_client = self.session.client('rekognition')
        self.bedrock_client = self.session.client('bedrock-runtime')
        self.transcribe = self.session.client('transcribe')

aws = AWSManager()
Enter fullscreen mode Exit fullscreen mode

Config file for configurations

AWS_REGION = "us-west-2"
S3_BUCKET_NAME = "bucket"  

BEDROCK_EMBEDDING_MODEL = "amazon.titan-embed-image-v1"

VIDEO_FILENAME = "NightOfTheLivingDead.mp4"        
LOCAL_VIDEO_PATH = "NightOfTheLivingDead.mp4"      
OUTPUT_DB_FILE = "vector_index.json"
OPENSEARCH_HOST = "open search end point without https"
Enter fullscreen mode Exit fullscreen mode

Video tools file for Rekognition service for shot detection and frame selection

import time
import cv2
import os
from .aws_clients import aws

def start_shot_detection(bucket, video_key):
    print(f"  Requesting Shot Detection for: {video_key}...")
    response = aws.rekognition_client.start_segment_detection(
        Video={'S3Object': {'Bucket': bucket, 'Name': video_key}},
        SegmentTypes=['SHOT']
    )
    return response['JobId']

def wait_for_job(job_id):
    print(" Waiting for shot detection to complete...")
    while True:
        status = aws.rekognition_client.get_segment_detection(JobId=job_id)
        s = status['JobStatus']
        if s in ['SUCCEEDED', 'FAILED']:
            return status
        print("  ...still processing...")
        time.sleep(5)

def extract_frames_for_shot(video_path, start_ms, end_ms, max_frames=5):
    """
    Extracts up to 'max_frames' evenly spaced across the shot duration.
    Returns a list of image bytes.
    """
    duration = end_ms - start_ms
    if duration < 1000:
        timestamps = [start_ms + (duration / 2)]
    else:
        step = duration / (max_frames + 1)
        timestamps = [start_ms + step * i for i in range(1, max_frames + 1)]

    frames = []
    cap = cv2.VideoCapture(video_path)

    for ts in timestamps:
        cap.set(cv2.CAP_PROP_POS_MSEC, ts)
        success, frame = cap.read()
        if success:
            _, buffer = cv2.imencode('.jpg', frame)
            frames.append(buffer.tobytes())

    cap.release()
    return frames
Enter fullscreen mode Exit fullscreen mode

Transcriber file for starting the transcription job and picking words in the time frame required

# modules/transcriber.py
import time
import json
import urllib.request
from .aws_clients import aws

def start_transcription_job(bucket, video_key, job_name):
    """
    Starts an AWS Transcribe job for the video in S3.
    """
    file_uri = f"s3://{bucket}/{video_key}"
    print(f" Starting Transcription for: {file_uri}")

    try:
        aws.transcribe.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={'MediaFileUri': file_uri},
            MediaFormat='mp4',
            LanguageCode='en-US',
            Settings={'ShowSpeakerLabels': False} 
        )
        return job_name
    except aws.transcribe.exceptions.ConflictException:
        print(f" Job {job_name} already exists. Using existing job.")
        return job_name

def wait_for_job(job_name):
    """
    Polls AWS Transcribe until the job is done.
    """
    print(f" Waiting for Transcription (Job: {job_name})...")
    while True:
        status = aws.transcribe.get_transcription_job(TranscriptionJobName=job_name)
        s = status['TranscriptionJob']['TranscriptionJobStatus']

        if s in ['COMPLETED', 'FAILED']:
            return status['TranscriptionJob']

        print("... transcribing audio ...")
        time.sleep(10)

def get_transcript_text(transcript_uri):
    """
    Downloads the JSON result from AWS and parses it into a clean list of segments.
    """
    print("Downloading Transcript JSON...")
    with urllib.request.urlopen(transcript_uri) as response:
        data = json.loads(response.read().decode())

    items = data['results']['items']

    clean_segments = []
    current_sentence = []
    start_time = 0.0

    for item in items:
        content = item['alternatives'][0]['content']
        type = item.get('type')

        if type == 'pronunciation':
            if not current_sentence:
                start_time = float(item['start_time'])
            current_sentence.append(content)

        elif type == 'punctuation':
            if current_sentence:
                current_sentence[-1] += content

            if content in ['.', '?', '!']:
                end_time = start_time 

                full_text = " ".join(current_sentence)
                clean_segments.append({
                    "text": full_text,
                    "start": start_time,

                })
                current_sentence = []

    print(f" Parsed {len(clean_segments)} sentences from audio.")
    return clean_segments

def get_text_in_range(segments, start_sec, end_sec):
    """
    Helper to find all text spoken between start_sec and end_sec.
    """
    matched_text = []
    for seg in segments:
        if seg['start'] >= start_sec and seg['start'] < end_sec:
            matched_text.append(seg['text'])

    return " ".join(matched_text)
Enter fullscreen mode Exit fullscreen mode

Analyzer file for sending shot-related data to the model for analysis

# modules/analyzer.py
import json
import boto3
from .aws_clients import aws

MODEL_ID = "arn:aws:bedrock:us-west-2:accountid:inference-profile/us.amazon.nova-pro-v1:0" 

def analyze_shot(frames, transcript_text):
    """
    Sends multiple images and context text to the LLM.
    Returns a rich text description.
    """

    content_blocks = []

    if transcript_text:
        content_blocks.append({
            "text": f"TRANSCRIPT OF AUDIO IN THIS SCENE:\n'{transcript_text}'\n\n"
        })
    else:
        content_blocks.append({
            "text": "AUDIO TRANSCRIPT: [No dialogue detected]\n\n"
        })

    for i, img_bytes in enumerate(frames):
        content_blocks.append({
            "text": f"Image {i+1}:" 
        })
        content_blocks.append({
            "image": {
                "format": "jpeg", 
                "source": {"bytes": img_bytes}
            }
        })

    prompt = """
    TASK: Analyze this sequence of frames and the audio transcript.
    OUTPUT: A detailed visual and narrative description.

    GUIDELINES:
    1. Describe the ACTION: What is physically happening? (e.g., chasing, fighting, kissing).
    2. Describe the EMOTION: What is the mood? (e.g., tense, joyful).
    3. Incorporate the DIALOGUE: Explain how the words relate to the visual.

    Return ONLY the description paragraph. Do not add headers like "Here is the analysis".
    """
    content_blocks.append({"text": prompt})

    try:
        response = aws.bedrock_client.converse(
            modelId=MODEL_ID,
            messages=[{"role": "user", "content": content_blocks}],
            inferenceConfig={"temperature": 0.1, "maxTokens": 500}
        )
        return response['output']['message']['content'][0]['text']

    except Exception as e:
        print(f" Analysis Error: {e}")
        return "Analysis failed."
Enter fullscreen mode Exit fullscreen mode

Embedder file for embedding generations

import json
import base64
import config
from modules.aws_clients import aws

def generate_vector_from_text(text_input):
    """
    Generates a vector for the text description using Titan Multimodal.
    """
    if not text_input: return None

    payload = {
        "inputText": text_input
    }

    try:
        response = aws.bedrock_client.invoke_model(
            modelId=config.BEDROCK_EMBEDDING_MODEL,
            contentType="application/json",
            accept="application/json",
            body=json.dumps(payload)
        )
        response_body = json.loads(response.get('body').read())
        return response_body.get('embedding')
    except Exception as e:
        print(f" Embedding Error: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Indexing file for pushing analyzed data to the open search

# modules/indexer.py
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import config

credentials = boto3.Session().get_credentials()
auth = ("username", "password")

client = OpenSearch(
    hosts=[{'host': config.OPENSEARCH_HOST, 'port': 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

def index_shot(video_key, shot_data):
    """
    Pushes a single analyzed shot to OpenSearch.
    """
    document = {
        "video_id": video_key,
        "shot_id": shot_data['shot_id'],
        "start_time": shot_data['start_ms'] / 1000.0, 
        "end_time": shot_data['end_ms'] / 1000.0,
        "description": shot_data['description'],
        "vector_embedding": shot_data['vector']
    }

    try:
        client.index(index="reelsmith-index", body=document)
        print(f"   Indexed Shot {shot_data['shot_id']} to Cloud.")
    except Exception as e:
        print(f"   Indexing Failed: {e}")
Enter fullscreen mode Exit fullscreen mode

Now that we have the tools ready, let's start building the flow

  • Create an S3 bucket for storing raw video files and generated video clips
  • Create a Dynamo DB table for storing the process updates
  • Let's build the Dispatcher, Worker, and Finalizer Python files. These will be deployed to Lambda later

Dispatcher file:

This file will help in shot detection and transcription service. Returned shots list with video id

import json
import boto3
import math
import os
import config
from modules import video_tools, transcriber

s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ReelSmith_Jobs')

SHOTS_PER_WORKER = 50 

def lambda_handler(event, context):

    bucket = ""
    key = ""

    if 'detail' in event and 'bucket' in event['detail']:
        bucket = event['detail']['bucket']['name']
        key = event['detail']['object']['key']

    elif 'Records' in event:
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = event['Records'][0]['s3']['object']['key']

    else:
        bucket = config.S3_BUCKET_NAME
        key = event.get('key', '')

    print(f" Received Event for: {key}")

    if not key.startswith("raw/"):
        print(f"SAFETY STOP: File '{key}' is not in 'raw/' folder. Ignoring.")
        return {"status": "Ignored", "reason": "Wrong Folder"}

    video_id = os.path.basename(key)

    print(f"Validation Passed. Processing Video ID: {video_id}")

    table.put_item(Item={
        'video_id': video_id,
        'status': 'ANALYZING_STRUCTURE',
        'timestamp': str(context.aws_request_id)
    })

    job_id = video_tools.start_shot_detection(bucket, key)
    shot_result = video_tools.wait_for_job(job_id)
    shots = [s for s in shot_result['Segments'] if s['Type'] == 'SHOT']

    job_name = f"reelsmith_parallel_{video_id[:10]}"
    transcriber.start_transcription_job(bucket, key, job_name)
    transcribe_result = transcriber.wait_for_job(job_name)
    transcript_uri = transcribe_result['Transcript']['TranscriptFileUri']

    total_shots = len(shots)
    print(f" Found {total_shots} shots. Splitting...")

    batches = []
    num_batches = math.ceil(total_shots / SHOTS_PER_WORKER)

    for i in range(num_batches):
        start = i * SHOTS_PER_WORKER
        end = start + SHOTS_PER_WORKER
        batch_shots = shots[start:end]

        batches.append({
            "bucket": bucket,
            "key": key,
            "batch_id": i,
            "total_batches": num_batches,
            "transcript_uri": transcript_uri,
            "shots": batch_shots
        })

    table.update_item(
        Key={'video_id': video_id},
        UpdateExpression="set #s = :s",
        ExpressionAttributeNames={'#s': 'status'},
        ExpressionAttributeValues={':s': 'PROCESSING_PARALLEL'}
    )

    return {"batches": batches, "video_id": video_id}
Enter fullscreen mode Exit fullscreen mode

Worker file:
This file will help in extracting frames and combining them with text from the transcription service for model analysis

import json
import os
import boto3
from modules import video_tools, transcriber, analyzer, embedder, indexer

s3 = boto3.client('s3')

def lambda_handler(event, context):

    bucket = event['bucket']
    key = event['key']
    batch_id = event['batch_id']
    shots = event['shots']
    transcript_uri = event['transcript_uri']

    print(f" Worker {batch_id}: Processing {len(shots)} shots...")

    local_path = f"/tmp/{os.path.basename(key)}"
    if not os.path.exists(local_path):
        s3.download_file(bucket, key, local_path)

    all_sentences = transcriber.get_transcript_text(transcript_uri)

    for shot in shots:
        start_ms = shot['StartTimestampMillis']
        end_ms = shot['EndTimestampMillis']

        frames = video_tools.extract_frames_for_shot(local_path, start_ms, end_ms, max_frames=3)
        if not frames: continue

        text_context = transcriber.get_text_in_range(all_sentences, start_ms/1000, end_ms/1000)

        description = analyzer.analyze_shot(frames, text_context)

        vector = embedder.generate_vector_from_text(description)

        shot_data = {
            "shot_id": f"{batch_id}_{start_ms}",
            "start_ms": start_ms,
            "end_ms": end_ms,
            "description": description,
            "vector": vector
        }
        indexer.index_shot(key, shot_data)

    return {"status": "success", "batch_id": batch_id}
Enter fullscreen mode Exit fullscreen mode

Finalizer file:
This file will update the process state in DynamoDB after the analysis process is completed.

import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ReelSmith_Jobs')

def lambda_handler(event, context):

    video_id = event.get('video_id') 

    if video_id:
        table.update_item(
            Key={'video_id': video_id},
            UpdateExpression="set #s = :s",
            ExpressionAttributeNames={'#s': 'status'},
            ExpressionAttributeValues={':s': 'COMPLETED'}
        )
    return {"status": "Job Completed"}
Enter fullscreen mode Exit fullscreen mode

Director file:

This file will help in generating an embedding for the user query, comparing it with analyzed embeddings, and editing the video based on the returned time stamps

import json
import os
import boto3
import subprocess
from botocore.exceptions import ClientError
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import config
from modules import embedder

s3 = boto3.client('s3')

credentials = boto3.Session().get_credentials()
auth = ("admin","Hai@1214129182")

client = OpenSearch(
    hosts=[{'host': config.OPENSEARCH_HOST, 'port': 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

def check_file_exists(bucket, key):
    """Helper to check if file exists in S3 before downloading"""
    try:
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError:
        return False

def cut_video_ffmpeg(bucket, video_key, start_s, end_s):
    """
    Downloads video, cuts it, uploads clip, returns Signed URL.
    """
    final_key = video_key

    if not check_file_exists(bucket, final_key):
        print(f" Warning: '{final_key}' not found. Trying 'raw/' prefix...")
        alt_key = f"raw/{video_key}" if not video_key.startswith("raw/") else video_key

        if check_file_exists(bucket, alt_key):
            print(f" Found file at: {alt_key}")
            final_key = alt_key
        else:
            alt_key_2 = video_key.replace("raw/", "")
            if check_file_exists(bucket, alt_key_2):
                final_key = alt_key_2
            else:
                raise FileNotFoundError(f"❌ Critical: Could not find video '{video_key}' in bucket '{bucket}'")

    filename = os.path.basename(final_key)
    local_input = f"/tmp/{filename}"
    local_output = f"/tmp/clip_{filename}"

    if not os.path.exists(local_input): 
        print(f" Downloading source: {final_key}...")
        s3.download_file(bucket, final_key, local_input)

    duration = end_s - start_s
    print(f" Cutting {duration}s from {start_s}s...")

    cmd = [
        "ffmpeg", "-y",
        "-ss", str(start_s),
        "-i", local_input,
        "-t", str(duration),
        "-c", "copy", 
        local_output
    ]
    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    clip_key = f"processed/clip_{int(start_s)}_{filename}"
    s3.upload_file(local_output, bucket, clip_key)

    url = s3.generate_presigned_url(
        'get_object',
        Params={'Bucket': bucket, 'Key': clip_key},
        ExpiresIn=3600
    )

    if os.path.exists(local_output): os.remove(local_output)

    return url

def lambda_handler(event, context):
    print(" Director Agent Started...")

    body = event
    if 'body' in event:
        body = json.loads(event['body'])

    query = body.get('query', '')
    threshold = body.get('threshold', 0.6)

    print(f" Query: '{query}' (Threshold: {threshold})")

    query_vector = embedder.generate_vector_from_text(query)

    search_query = {
        "size": 3,
        "query": {
            "knn": {
                "vector_embedding": {
                    "vector": query_vector,
                    "k": 3
                }
            }
        }
    }

    response = client.search(index="reelsmith-index", body=search_query)
    hits = response['hits']['hits']

    if not hits:
        return {"statusCode": 404, "body": json.dumps("No index data found.")}

    best_hit = hits[0]
    score = best_hit['_score']
    source = best_hit['_source']

    print(f" Best Match: {score:.4f} (Desc: {source['description'][:50]}...)")

    if score < threshold:
        msg = f" Confidence Low ({score:.2f} < {threshold}). No scene found."
        print(msg)
        return {
            "statusCode": 200, 
            "body": json.dumps({"found": False, "message": msg})
        }

    print(" Confidence High. Generating Video...")

    video_key = source.get('video_id') or source.get('video_key')

    try:
        clip_url = cut_video_ffmpeg(
            config.S3_BUCKET_NAME, 
            video_key, 
            source['start_time'], 
            source['end_time']
        )

        return {
            "statusCode": 200,
            "body": json.dumps({
                "found": True,
                "confidence": score,
                "description": source['description'],
                "video_url": clip_url
            })
        }
    except Exception as e:
        print(f" Director Error: {str(e)}")
        return {
            "statusCode": 500,
            "body": json.dumps({"found": False, "message": f"Processing Error: {str(e)}"})
        }
Enter fullscreen mode Exit fullscreen mode

Docker file:

# Use AWS Lambda Python 3.11 Base Image
FROM public.ecr.aws/lambda/python:3.11

# Update and Install System Dependencies (gcc needed for compiling)
RUN yum update -y && \
    yum install -y mesa-libGL gcc gcc-c++ python3-devel

# Copy Requirements
COPY requirements.txt ${LAMBDA_TASK_ROOT}

# Install Python Packages
RUN pip install -r requirements.txt

# Copy Application Code
COPY config.py ${LAMBDA_TASK_ROOT}
COPY lambda_dispatcher.py ${LAMBDA_TASK_ROOT}
COPY lambda_worker.py ${LAMBDA_TASK_ROOT}     
COPY lambda_finalizer.py ${LAMBDA_TASK_ROOT}   
COPY lambda_director.py ${LAMBDA_TASK_ROOT}    
COPY modules/ ${LAMBDA_TASK_ROOT}/modules/

# Default CMD (Overridden in Console)
CMD [ "lambda_dispatcher.lambda_handler" ]
Enter fullscreen mode Exit fullscreen mode
  • Build the Docker image using this Docker file and upload to ECR
  • Build the 4 Lambdas using the same image and just change the CMD of the image like this
lambda_dispatcher.lambda_handler
lambda_worker.lambda_handler
lambda_finalizer.lambda_handler
lambda_director.lambda_handler

Enter fullscreen mode Exit fullscreen mode

Now that we have everything ready, let's connect all the functions using step functions

  • Visit the step functions service and create a state machine with the following code
{
  "Comment": "Parallel Video Processor",
  "StartAt": "Dispatcher",
  "States": {
    "Dispatcher": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:acccountid:function:lambda-Dispatcher",
      "Next": "ParallelProcessing"
    },
    "ParallelProcessing": {
      "Type": "Map",
      "ItemsPath": "$.batches",
      "MaxConcurrency": 20,
      "Iterator": {
        "StartAt": "ProcessBatch",
        "States": {
          "ProcessBatch": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-west-2:accountid:function:lambda-Worker",
            "End": true
          }
        }
      },
      "ResultPath": "$.workerResults",
      "Next": "MarkComplete"
    },
    "MarkComplete": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:accountid:function:lambda-Finalizer",
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode
  • It will look like this

  • We have to connect this state machine with the S3 file upload
  • Visit the Event bridge service to create a rule that will trigger the state machine if any file is uploaded to the bucket

Rule Event Pattern:

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["reel-smith-ai"]
    },
    "object": {
      "key": [{
        "prefix": "raw/"
      }]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode
  • Select the state machine for receiving the file upload notification

  • Create a domain in the Open Search service and create an index using this file
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3

# --- CONFIG ---
HOST = "endpoint" # e.g., collection-id.us-east-1.aoss.amazonaws.com
REGION = "us-west-2"
SERVICE = "aoss"

username = 'username' 
password = 'password'

auth = (username, password)

client = OpenSearch(
    hosts=[{'host': HOST, 'port': 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    pool_maxsize=20
)

index_name = "reelsmith-index"

def create_index():
    index_body = {
        "settings": {
            "index": {
                "knn": True  # Enable Vector Search
            }
        },
        "mappings": {
            "properties": {
                "video_id": {"type": "keyword"},
                "shot_id": {"type": "integer"},
                "start_time": {"type": "float"},
                "end_time": {"type": "float"},
                "description": {"type": "text"},
                "vector_embedding": {
                    "type": "knn_vector",
                    "dimension": 1024  # Titan Multimodal Dimension
                }
            }
        }
    }

    if not client.indices.exists(index=index_name):
        response = client.indices.create(index=index_name, body=index_body)
        print("Index created:", response)
    else:
        print("Index already exists.")

if __name__ == "__main__":
    create_index()
Enter fullscreen mode Exit fullscreen mode

Now that we have everything ready, let's build our front end using Streamlit, and we will start testing our setup

Streamlit code:

import streamlit as st
import boto3
import json
import time
import os
import config

# --- CONFIGURATION ---
# Ensure these match your actual AWS setup details in config.py
DIRECTOR_FUNCTION_NAME = "ReelSmith-Director"
BUCKET_NAME = config.S3_BUCKET_NAME
TABLE_NAME = "ReelSmith_Jobs"

# --- AWS CLIENTS ---
# We use boto3 to talk to the Cloud Backends
s3 = boto3.client('s3', region_name=config.AWS_REGION)
lambda_client = boto3.client('lambda', region_name=config.AWS_REGION)
dynamodb = boto3.resource('dynamodb', region_name=config.AWS_REGION)
table = dynamodb.Table(TABLE_NAME)

# --- PAGE SETUP ---
st.set_page_config(page_title="ReelSmith Cloud", layout="wide", page_icon="🎬")

st.title("🎬 ReelSmith: Cloud AI Director")
st.markdown("""
**Serverless Video Intelligence Platform** *Powered by AWS Step Functions, Nova Premier, and OpenSearch Serverless.*
""")

# Create Tabs for the two Agents
tab1, tab2 = st.tabs(["πŸ”Ž Ask the Director", "πŸ“€ Upload New Footage"])

# ==========================================
# TAB 1: THE DIRECTOR AGENT (Retrieval)
# ==========================================
with tab1:
    st.header("Ask the Director")
    st.caption("Agent 2: Performs Semantic Search & Real-time Editing")

    col1, col2 = st.columns([3, 1])
    with col1:
        query = st.text_input("Describe the scene:", placeholder="e.g., A truck exploding in flames")
    with col2:
        threshold = st.slider("Confidence Threshold", 0.0, 1.0, 0.60, help="Only show results if the AI is this confident.")

    if st.button("🎬 Action!", type="primary"):
        if not query:
            st.warning("Please enter a scene description.")
        else:
            # 1. VISUAL FEEDBACK
            with st.status("🧠 Agent is thinking...", expanded=True) as status:
                st.write("πŸ“‘ Contacting AWS Lambda Director...")

                # 2. INVOKE LAMBDA (Synchronous)
                payload = {"query": query, "threshold": threshold}

                try:
                    response = lambda_client.invoke(
                        FunctionName=DIRECTOR_FUNCTION_NAME,
                        InvocationType='RequestResponse',
                        Payload=json.dumps(payload)
                    )

                    # 3. PARSE RESPONSE
                    response_payload = json.loads(response['Payload'].read())

                    # Handle Lambda System Errors (500s)
                    if 'body' not in response_payload:
                        st.error(f"Lambda System Error: {response_payload}")
                        status.update(label="System Error", state="error")
                    else:
                        body = json.loads(response_payload['body'])

                        if body.get("found"):
                            # SUCCESS PATH
                            st.write(f"βœ… **Match Found!** (Confidence: {body['confidence']:.2f})")
                            st.write("βœ‚οΈ  Cutting video in the cloud (FFmpeg)...")

                            # Update Status
                            status.update(label="Video Ready!", state="complete", expanded=False)

                            # 4. DISPLAY RESULTS
                            st.divider()
                            st.success(f"**Scene Context:** {body['description']}")
                            st.video(body['video_url'])

                        else:
                            # FAILURE PATH (Confidence Low or No Match)
                            status.update(label="No Scene Found", state="error", expanded=False)
                            st.error(f"β›” Agent Response: {body.get('message', 'Unknown error')}")

                except Exception as e:
                    status.update(label="Connection Failed", state="error")
                    st.error(f"Client Error: {e}")

# ==========================================
# TAB 2: THE ANALYST AGENT (Ingestion)
# ==========================================
with tab2:
    st.header("Ingest New Footage")
    st.caption("Agent 1: Watches, Transcribes, and Indexes Video into OpenSearch")

    uploaded_file = st.file_uploader("Choose an MP4 file", type=["mp4"])

    if uploaded_file is not None:
        if st.button("πŸš€ Upload & Analyze"):
            video_id = uploaded_file.name

            # 1. UPLOAD TO S3 (The Trigger)
            with st.spinner("Uploading to S3..."):
                # CRITICAL: We upload to 'raw/' folder to trigger EventBridge
                s3_key = f"raw/{video_id}"

                # Save to temp locally
                temp_path = f"temp_{video_id}"
                with open(temp_path, "wb") as f:
                    f.write(uploaded_file.getbuffer())

                # Upload
                s3.upload_file(temp_path, BUCKET_NAME, s3_key)

                # Cleanup
                os.remove(temp_path)

            st.success("Upload Complete! EventBridge has triggered the Step Function.")

            # 2. POLL DYNAMODB FOR STATUS
            st.markdown("### πŸ“‘ Job Status")
            progress_bar = st.progress(0)
            status_text = st.empty()

            # Polling Loop
            while True:
                try:
                    # Get Item from DynamoDB
                    response = table.get_item(Key={'video_id': video_id})
                except Exception as e:
                    st.error(f"DB Error: {e}")
                    break

                # Case A: Job hasn't started yet (EventBridge lag)
                if 'Item' not in response:
                    status_text.info("⏳ Queued... Waiting for Dispatcher...")
                    time.sleep(3)
                    continue

                # Case B: Check Status
                job = response['Item']
                status = job.get('status', 'UNKNOWN')

                if status == 'COMPLETED':
                    progress_bar.progress(100)
                    status_text.success("βœ… Analysis Complete! You can now search for this video in Tab 1.")
                    st.balloons()
                    break

                elif status == 'ANALYZING_STRUCTURE':
                    progress_bar.progress(10)
                    status_text.text("πŸ‘€ Phase 1: Detecting shots and transcribing audio...")

                elif status == 'PROCESSING_PARALLEL':
                    progress_bar.progress(50)
                    status_text.text("⚑ Phase 2: Parallel Agents analyzing shots (Distributed Map)...")

                elif status == 'FAILED':
                    status_text.error("❌ Analysis Failed. Check CloudWatch logs.")
                    break

                # Wait before next poll
                time.sleep(5)
Enter fullscreen mode Exit fullscreen mode
  • Run the Streamlit app using this command
streamlit run app.py
Enter fullscreen mode Exit fullscreen mode

I downloaded a movie clip from YouTube and tested the process

Uploading Process:

Output:

Based on the input video, I gave this prompt. It worked. Not a perfect one, but it worked. We can always increase the accuracy by changing the ingestion process and the analysis processΒ 

Here is the output video

outputvideo

Incase if the gif full not rendering visit this Gdrive link to see the output
output.gif

I am still working on improving accuracy and reducing hallucinations. I am open to suggestions. Please comment your suggestions or inputs.

Thanks for reading.

Top comments (0)