Hi everyone,
Imagine typing: 'Show me the car chase scene' and within seconds getting a perfectly(almost perfectly π
) cut video delivered to your screen
I call it Text-To-Clip. It's an automated video editing engine that combines computer vision(to see), Vector search(to remember), and FFmpeg(to act)
In this post, I am breaking down the exact AWS architecture I used to build it. From the distributed 'Map' states in the Step function to Dockerized computer vision containers running on Lambda
Services I used:
- AWS Bedrock
- Lambda
- Step functions
- Open search Service
- Amazon RekognitionΒ
- S3
Architecture:
Implementation steps:
Identifying shots in the video: Whenever the user uploads any video, it will be sent for shots identification to the AWS Rekognition service, and a transcription job will start using the AWS Transcribe service
Analyzing shots: From each shot x number of frames will be picked along with transcription in that time frame. These will be sent to the Bedrock Nova model for scene analysis
Storing analyzed data: Model analyzed data will be sent to the embedding model, and the generated embeddings will be stored in the OpenSearch index
Querying and Generating video: User query will be converted to embedding and compared against shot embeddings. Returned embedding timestamps will be passed to FFMpeg to cut and stitch the video.
Load Balancing: When I uploaded a movie with 1hr 30 mins duration, More than 800+ shots were returned. It took more than 40 minutes to analyze each shot. So, a parallel processing mechanism was implemented using Step functions Service, where multiple lambdas will run with the distributed load at the same time.Β
Let's start the implementation
Folder Structure:
Module Files:
- The requirements file contains all the Python libraries
boto3
opencv-python-headless
numpy<2.0.0
opensearch-py
requests
AWS config file for initiating clients
import boto3
import config
class AWSManager:
def __init__(self):
self.session = boto3.Session(region_name=config.AWS_REGION)
self.s3_client = self.session.client('s3')
self.rekognition_client = self.session.client('rekognition')
self.bedrock_client = self.session.client('bedrock-runtime')
self.transcribe = self.session.client('transcribe')
aws = AWSManager()
Config file for configurations
AWS_REGION = "us-west-2"
S3_BUCKET_NAME = "bucket"
BEDROCK_EMBEDDING_MODEL = "amazon.titan-embed-image-v1"
VIDEO_FILENAME = "NightOfTheLivingDead.mp4"
LOCAL_VIDEO_PATH = "NightOfTheLivingDead.mp4"
OUTPUT_DB_FILE = "vector_index.json"
OPENSEARCH_HOST = "open search end point without https"
Video tools file for Rekognition service for shot detection and frame selection
import time
import cv2
import os
from .aws_clients import aws
def start_shot_detection(bucket, video_key):
print(f" Requesting Shot Detection for: {video_key}...")
response = aws.rekognition_client.start_segment_detection(
Video={'S3Object': {'Bucket': bucket, 'Name': video_key}},
SegmentTypes=['SHOT']
)
return response['JobId']
def wait_for_job(job_id):
print(" Waiting for shot detection to complete...")
while True:
status = aws.rekognition_client.get_segment_detection(JobId=job_id)
s = status['JobStatus']
if s in ['SUCCEEDED', 'FAILED']:
return status
print(" ...still processing...")
time.sleep(5)
def extract_frames_for_shot(video_path, start_ms, end_ms, max_frames=5):
"""
Extracts up to 'max_frames' evenly spaced across the shot duration.
Returns a list of image bytes.
"""
duration = end_ms - start_ms
if duration < 1000:
timestamps = [start_ms + (duration / 2)]
else:
step = duration / (max_frames + 1)
timestamps = [start_ms + step * i for i in range(1, max_frames + 1)]
frames = []
cap = cv2.VideoCapture(video_path)
for ts in timestamps:
cap.set(cv2.CAP_PROP_POS_MSEC, ts)
success, frame = cap.read()
if success:
_, buffer = cv2.imencode('.jpg', frame)
frames.append(buffer.tobytes())
cap.release()
return frames
Transcriber file for starting the transcription job and picking words in the time frame required
# modules/transcriber.py
import time
import json
import urllib.request
from .aws_clients import aws
def start_transcription_job(bucket, video_key, job_name):
"""
Starts an AWS Transcribe job for the video in S3.
"""
file_uri = f"s3://{bucket}/{video_key}"
print(f" Starting Transcription for: {file_uri}")
try:
aws.transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': file_uri},
MediaFormat='mp4',
LanguageCode='en-US',
Settings={'ShowSpeakerLabels': False}
)
return job_name
except aws.transcribe.exceptions.ConflictException:
print(f" Job {job_name} already exists. Using existing job.")
return job_name
def wait_for_job(job_name):
"""
Polls AWS Transcribe until the job is done.
"""
print(f" Waiting for Transcription (Job: {job_name})...")
while True:
status = aws.transcribe.get_transcription_job(TranscriptionJobName=job_name)
s = status['TranscriptionJob']['TranscriptionJobStatus']
if s in ['COMPLETED', 'FAILED']:
return status['TranscriptionJob']
print("... transcribing audio ...")
time.sleep(10)
def get_transcript_text(transcript_uri):
"""
Downloads the JSON result from AWS and parses it into a clean list of segments.
"""
print("Downloading Transcript JSON...")
with urllib.request.urlopen(transcript_uri) as response:
data = json.loads(response.read().decode())
items = data['results']['items']
clean_segments = []
current_sentence = []
start_time = 0.0
for item in items:
content = item['alternatives'][0]['content']
type = item.get('type')
if type == 'pronunciation':
if not current_sentence:
start_time = float(item['start_time'])
current_sentence.append(content)
elif type == 'punctuation':
if current_sentence:
current_sentence[-1] += content
if content in ['.', '?', '!']:
end_time = start_time
full_text = " ".join(current_sentence)
clean_segments.append({
"text": full_text,
"start": start_time,
})
current_sentence = []
print(f" Parsed {len(clean_segments)} sentences from audio.")
return clean_segments
def get_text_in_range(segments, start_sec, end_sec):
"""
Helper to find all text spoken between start_sec and end_sec.
"""
matched_text = []
for seg in segments:
if seg['start'] >= start_sec and seg['start'] < end_sec:
matched_text.append(seg['text'])
return " ".join(matched_text)
Analyzer file for sending shot-related data to the model for analysis
# modules/analyzer.py
import json
import boto3
from .aws_clients import aws
MODEL_ID = "arn:aws:bedrock:us-west-2:accountid:inference-profile/us.amazon.nova-pro-v1:0"
def analyze_shot(frames, transcript_text):
"""
Sends multiple images and context text to the LLM.
Returns a rich text description.
"""
content_blocks = []
if transcript_text:
content_blocks.append({
"text": f"TRANSCRIPT OF AUDIO IN THIS SCENE:\n'{transcript_text}'\n\n"
})
else:
content_blocks.append({
"text": "AUDIO TRANSCRIPT: [No dialogue detected]\n\n"
})
for i, img_bytes in enumerate(frames):
content_blocks.append({
"text": f"Image {i+1}:"
})
content_blocks.append({
"image": {
"format": "jpeg",
"source": {"bytes": img_bytes}
}
})
prompt = """
TASK: Analyze this sequence of frames and the audio transcript.
OUTPUT: A detailed visual and narrative description.
GUIDELINES:
1. Describe the ACTION: What is physically happening? (e.g., chasing, fighting, kissing).
2. Describe the EMOTION: What is the mood? (e.g., tense, joyful).
3. Incorporate the DIALOGUE: Explain how the words relate to the visual.
Return ONLY the description paragraph. Do not add headers like "Here is the analysis".
"""
content_blocks.append({"text": prompt})
try:
response = aws.bedrock_client.converse(
modelId=MODEL_ID,
messages=[{"role": "user", "content": content_blocks}],
inferenceConfig={"temperature": 0.1, "maxTokens": 500}
)
return response['output']['message']['content'][0]['text']
except Exception as e:
print(f" Analysis Error: {e}")
return "Analysis failed."
Embedder file for embedding generations
import json
import base64
import config
from modules.aws_clients import aws
def generate_vector_from_text(text_input):
"""
Generates a vector for the text description using Titan Multimodal.
"""
if not text_input: return None
payload = {
"inputText": text_input
}
try:
response = aws.bedrock_client.invoke_model(
modelId=config.BEDROCK_EMBEDDING_MODEL,
contentType="application/json",
accept="application/json",
body=json.dumps(payload)
)
response_body = json.loads(response.get('body').read())
return response_body.get('embedding')
except Exception as e:
print(f" Embedding Error: {e}")
return None
Indexing file for pushing analyzed data to the open search
# modules/indexer.py
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import config
credentials = boto3.Session().get_credentials()
auth = ("username", "password")
client = OpenSearch(
hosts=[{'host': config.OPENSEARCH_HOST, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
def index_shot(video_key, shot_data):
"""
Pushes a single analyzed shot to OpenSearch.
"""
document = {
"video_id": video_key,
"shot_id": shot_data['shot_id'],
"start_time": shot_data['start_ms'] / 1000.0,
"end_time": shot_data['end_ms'] / 1000.0,
"description": shot_data['description'],
"vector_embedding": shot_data['vector']
}
try:
client.index(index="reelsmith-index", body=document)
print(f" Indexed Shot {shot_data['shot_id']} to Cloud.")
except Exception as e:
print(f" Indexing Failed: {e}")
Now that we have the tools ready, let's start building the flow
- Create an S3 bucket for storing raw video files and generated video clips
- Create a Dynamo DB table for storing the process updates
- Let's build the Dispatcher, Worker, and Finalizer Python files. These will be deployed to Lambda later
Dispatcher file:
This file will help in shot detection and transcription service. Returned shots list with video id
import json
import boto3
import math
import os
import config
from modules import video_tools, transcriber
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ReelSmith_Jobs')
SHOTS_PER_WORKER = 50
def lambda_handler(event, context):
bucket = ""
key = ""
if 'detail' in event and 'bucket' in event['detail']:
bucket = event['detail']['bucket']['name']
key = event['detail']['object']['key']
elif 'Records' in event:
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
else:
bucket = config.S3_BUCKET_NAME
key = event.get('key', '')
print(f" Received Event for: {key}")
if not key.startswith("raw/"):
print(f"SAFETY STOP: File '{key}' is not in 'raw/' folder. Ignoring.")
return {"status": "Ignored", "reason": "Wrong Folder"}
video_id = os.path.basename(key)
print(f"Validation Passed. Processing Video ID: {video_id}")
table.put_item(Item={
'video_id': video_id,
'status': 'ANALYZING_STRUCTURE',
'timestamp': str(context.aws_request_id)
})
job_id = video_tools.start_shot_detection(bucket, key)
shot_result = video_tools.wait_for_job(job_id)
shots = [s for s in shot_result['Segments'] if s['Type'] == 'SHOT']
job_name = f"reelsmith_parallel_{video_id[:10]}"
transcriber.start_transcription_job(bucket, key, job_name)
transcribe_result = transcriber.wait_for_job(job_name)
transcript_uri = transcribe_result['Transcript']['TranscriptFileUri']
total_shots = len(shots)
print(f" Found {total_shots} shots. Splitting...")
batches = []
num_batches = math.ceil(total_shots / SHOTS_PER_WORKER)
for i in range(num_batches):
start = i * SHOTS_PER_WORKER
end = start + SHOTS_PER_WORKER
batch_shots = shots[start:end]
batches.append({
"bucket": bucket,
"key": key,
"batch_id": i,
"total_batches": num_batches,
"transcript_uri": transcript_uri,
"shots": batch_shots
})
table.update_item(
Key={'video_id': video_id},
UpdateExpression="set #s = :s",
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':s': 'PROCESSING_PARALLEL'}
)
return {"batches": batches, "video_id": video_id}
Worker file:
This file will help in extracting frames and combining them with text from the transcription service for model analysis
import json
import os
import boto3
from modules import video_tools, transcriber, analyzer, embedder, indexer
s3 = boto3.client('s3')
def lambda_handler(event, context):
bucket = event['bucket']
key = event['key']
batch_id = event['batch_id']
shots = event['shots']
transcript_uri = event['transcript_uri']
print(f" Worker {batch_id}: Processing {len(shots)} shots...")
local_path = f"/tmp/{os.path.basename(key)}"
if not os.path.exists(local_path):
s3.download_file(bucket, key, local_path)
all_sentences = transcriber.get_transcript_text(transcript_uri)
for shot in shots:
start_ms = shot['StartTimestampMillis']
end_ms = shot['EndTimestampMillis']
frames = video_tools.extract_frames_for_shot(local_path, start_ms, end_ms, max_frames=3)
if not frames: continue
text_context = transcriber.get_text_in_range(all_sentences, start_ms/1000, end_ms/1000)
description = analyzer.analyze_shot(frames, text_context)
vector = embedder.generate_vector_from_text(description)
shot_data = {
"shot_id": f"{batch_id}_{start_ms}",
"start_ms": start_ms,
"end_ms": end_ms,
"description": description,
"vector": vector
}
indexer.index_shot(key, shot_data)
return {"status": "success", "batch_id": batch_id}
Finalizer file:
This file will update the process state in DynamoDB after the analysis process is completed.
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ReelSmith_Jobs')
def lambda_handler(event, context):
video_id = event.get('video_id')
if video_id:
table.update_item(
Key={'video_id': video_id},
UpdateExpression="set #s = :s",
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':s': 'COMPLETED'}
)
return {"status": "Job Completed"}
Director file:
This file will help in generating an embedding for the user query, comparing it with analyzed embeddings, and editing the video based on the returned time stamps
import json
import os
import boto3
import subprocess
from botocore.exceptions import ClientError
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import config
from modules import embedder
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
auth = ("admin","Hai@1214129182")
client = OpenSearch(
hosts=[{'host': config.OPENSEARCH_HOST, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
def check_file_exists(bucket, key):
"""Helper to check if file exists in S3 before downloading"""
try:
s3.head_object(Bucket=bucket, Key=key)
return True
except ClientError:
return False
def cut_video_ffmpeg(bucket, video_key, start_s, end_s):
"""
Downloads video, cuts it, uploads clip, returns Signed URL.
"""
final_key = video_key
if not check_file_exists(bucket, final_key):
print(f" Warning: '{final_key}' not found. Trying 'raw/' prefix...")
alt_key = f"raw/{video_key}" if not video_key.startswith("raw/") else video_key
if check_file_exists(bucket, alt_key):
print(f" Found file at: {alt_key}")
final_key = alt_key
else:
alt_key_2 = video_key.replace("raw/", "")
if check_file_exists(bucket, alt_key_2):
final_key = alt_key_2
else:
raise FileNotFoundError(f"β Critical: Could not find video '{video_key}' in bucket '{bucket}'")
filename = os.path.basename(final_key)
local_input = f"/tmp/{filename}"
local_output = f"/tmp/clip_{filename}"
if not os.path.exists(local_input):
print(f" Downloading source: {final_key}...")
s3.download_file(bucket, final_key, local_input)
duration = end_s - start_s
print(f" Cutting {duration}s from {start_s}s...")
cmd = [
"ffmpeg", "-y",
"-ss", str(start_s),
"-i", local_input,
"-t", str(duration),
"-c", "copy",
local_output
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
clip_key = f"processed/clip_{int(start_s)}_{filename}"
s3.upload_file(local_output, bucket, clip_key)
url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': bucket, 'Key': clip_key},
ExpiresIn=3600
)
if os.path.exists(local_output): os.remove(local_output)
return url
def lambda_handler(event, context):
print(" Director Agent Started...")
body = event
if 'body' in event:
body = json.loads(event['body'])
query = body.get('query', '')
threshold = body.get('threshold', 0.6)
print(f" Query: '{query}' (Threshold: {threshold})")
query_vector = embedder.generate_vector_from_text(query)
search_query = {
"size": 3,
"query": {
"knn": {
"vector_embedding": {
"vector": query_vector,
"k": 3
}
}
}
}
response = client.search(index="reelsmith-index", body=search_query)
hits = response['hits']['hits']
if not hits:
return {"statusCode": 404, "body": json.dumps("No index data found.")}
best_hit = hits[0]
score = best_hit['_score']
source = best_hit['_source']
print(f" Best Match: {score:.4f} (Desc: {source['description'][:50]}...)")
if score < threshold:
msg = f" Confidence Low ({score:.2f} < {threshold}). No scene found."
print(msg)
return {
"statusCode": 200,
"body": json.dumps({"found": False, "message": msg})
}
print(" Confidence High. Generating Video...")
video_key = source.get('video_id') or source.get('video_key')
try:
clip_url = cut_video_ffmpeg(
config.S3_BUCKET_NAME,
video_key,
source['start_time'],
source['end_time']
)
return {
"statusCode": 200,
"body": json.dumps({
"found": True,
"confidence": score,
"description": source['description'],
"video_url": clip_url
})
}
except Exception as e:
print(f" Director Error: {str(e)}")
return {
"statusCode": 500,
"body": json.dumps({"found": False, "message": f"Processing Error: {str(e)}"})
}
Docker file:
# Use AWS Lambda Python 3.11 Base Image
FROM public.ecr.aws/lambda/python:3.11
# Update and Install System Dependencies (gcc needed for compiling)
RUN yum update -y && \
yum install -y mesa-libGL gcc gcc-c++ python3-devel
# Copy Requirements
COPY requirements.txt ${LAMBDA_TASK_ROOT}
# Install Python Packages
RUN pip install -r requirements.txt
# Copy Application Code
COPY config.py ${LAMBDA_TASK_ROOT}
COPY lambda_dispatcher.py ${LAMBDA_TASK_ROOT}
COPY lambda_worker.py ${LAMBDA_TASK_ROOT}
COPY lambda_finalizer.py ${LAMBDA_TASK_ROOT}
COPY lambda_director.py ${LAMBDA_TASK_ROOT}
COPY modules/ ${LAMBDA_TASK_ROOT}/modules/
# Default CMD (Overridden in Console)
CMD [ "lambda_dispatcher.lambda_handler" ]
- Build the Docker image using this Docker file and upload to ECR
- Build the 4 Lambdas using the same image and just change the CMD of the image like this
lambda_dispatcher.lambda_handler
lambda_worker.lambda_handler
lambda_finalizer.lambda_handler
lambda_director.lambda_handler
Now that we have everything ready, let's connect all the functions using step functions
- Visit the step functions service and create a state machine with the following code
{
"Comment": "Parallel Video Processor",
"StartAt": "Dispatcher",
"States": {
"Dispatcher": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:acccountid:function:lambda-Dispatcher",
"Next": "ParallelProcessing"
},
"ParallelProcessing": {
"Type": "Map",
"ItemsPath": "$.batches",
"MaxConcurrency": 20,
"Iterator": {
"StartAt": "ProcessBatch",
"States": {
"ProcessBatch": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:accountid:function:lambda-Worker",
"End": true
}
}
},
"ResultPath": "$.workerResults",
"Next": "MarkComplete"
},
"MarkComplete": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:accountid:function:lambda-Finalizer",
"End": true
}
}
}
- It will look like this
- We have to connect this state machine with the S3 file upload
- Visit the Event bridge service to create a rule that will trigger the state machine if any file is uploaded to the bucket
Rule Event Pattern:
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["reel-smith-ai"]
},
"object": {
"key": [{
"prefix": "raw/"
}]
}
}
}
- Select the state machine for receiving the file upload notification
- Create a domain in the Open Search service and create an index using this file
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
# --- CONFIG ---
HOST = "endpoint" # e.g., collection-id.us-east-1.aoss.amazonaws.com
REGION = "us-west-2"
SERVICE = "aoss"
username = 'username'
password = 'password'
auth = (username, password)
client = OpenSearch(
hosts=[{'host': HOST, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20
)
index_name = "reelsmith-index"
def create_index():
index_body = {
"settings": {
"index": {
"knn": True # Enable Vector Search
}
},
"mappings": {
"properties": {
"video_id": {"type": "keyword"},
"shot_id": {"type": "integer"},
"start_time": {"type": "float"},
"end_time": {"type": "float"},
"description": {"type": "text"},
"vector_embedding": {
"type": "knn_vector",
"dimension": 1024 # Titan Multimodal Dimension
}
}
}
}
if not client.indices.exists(index=index_name):
response = client.indices.create(index=index_name, body=index_body)
print("Index created:", response)
else:
print("Index already exists.")
if __name__ == "__main__":
create_index()
Now that we have everything ready, let's build our front end using Streamlit, and we will start testing our setup
Streamlit code:
import streamlit as st
import boto3
import json
import time
import os
import config
# --- CONFIGURATION ---
# Ensure these match your actual AWS setup details in config.py
DIRECTOR_FUNCTION_NAME = "ReelSmith-Director"
BUCKET_NAME = config.S3_BUCKET_NAME
TABLE_NAME = "ReelSmith_Jobs"
# --- AWS CLIENTS ---
# We use boto3 to talk to the Cloud Backends
s3 = boto3.client('s3', region_name=config.AWS_REGION)
lambda_client = boto3.client('lambda', region_name=config.AWS_REGION)
dynamodb = boto3.resource('dynamodb', region_name=config.AWS_REGION)
table = dynamodb.Table(TABLE_NAME)
# --- PAGE SETUP ---
st.set_page_config(page_title="ReelSmith Cloud", layout="wide", page_icon="π¬")
st.title("π¬ ReelSmith: Cloud AI Director")
st.markdown("""
**Serverless Video Intelligence Platform** *Powered by AWS Step Functions, Nova Premier, and OpenSearch Serverless.*
""")
# Create Tabs for the two Agents
tab1, tab2 = st.tabs(["π Ask the Director", "π€ Upload New Footage"])
# ==========================================
# TAB 1: THE DIRECTOR AGENT (Retrieval)
# ==========================================
with tab1:
st.header("Ask the Director")
st.caption("Agent 2: Performs Semantic Search & Real-time Editing")
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input("Describe the scene:", placeholder="e.g., A truck exploding in flames")
with col2:
threshold = st.slider("Confidence Threshold", 0.0, 1.0, 0.60, help="Only show results if the AI is this confident.")
if st.button("π¬ Action!", type="primary"):
if not query:
st.warning("Please enter a scene description.")
else:
# 1. VISUAL FEEDBACK
with st.status("π§ Agent is thinking...", expanded=True) as status:
st.write("π‘ Contacting AWS Lambda Director...")
# 2. INVOKE LAMBDA (Synchronous)
payload = {"query": query, "threshold": threshold}
try:
response = lambda_client.invoke(
FunctionName=DIRECTOR_FUNCTION_NAME,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
# 3. PARSE RESPONSE
response_payload = json.loads(response['Payload'].read())
# Handle Lambda System Errors (500s)
if 'body' not in response_payload:
st.error(f"Lambda System Error: {response_payload}")
status.update(label="System Error", state="error")
else:
body = json.loads(response_payload['body'])
if body.get("found"):
# SUCCESS PATH
st.write(f"β
**Match Found!** (Confidence: {body['confidence']:.2f})")
st.write("βοΈ Cutting video in the cloud (FFmpeg)...")
# Update Status
status.update(label="Video Ready!", state="complete", expanded=False)
# 4. DISPLAY RESULTS
st.divider()
st.success(f"**Scene Context:** {body['description']}")
st.video(body['video_url'])
else:
# FAILURE PATH (Confidence Low or No Match)
status.update(label="No Scene Found", state="error", expanded=False)
st.error(f"β Agent Response: {body.get('message', 'Unknown error')}")
except Exception as e:
status.update(label="Connection Failed", state="error")
st.error(f"Client Error: {e}")
# ==========================================
# TAB 2: THE ANALYST AGENT (Ingestion)
# ==========================================
with tab2:
st.header("Ingest New Footage")
st.caption("Agent 1: Watches, Transcribes, and Indexes Video into OpenSearch")
uploaded_file = st.file_uploader("Choose an MP4 file", type=["mp4"])
if uploaded_file is not None:
if st.button("π Upload & Analyze"):
video_id = uploaded_file.name
# 1. UPLOAD TO S3 (The Trigger)
with st.spinner("Uploading to S3..."):
# CRITICAL: We upload to 'raw/' folder to trigger EventBridge
s3_key = f"raw/{video_id}"
# Save to temp locally
temp_path = f"temp_{video_id}"
with open(temp_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# Upload
s3.upload_file(temp_path, BUCKET_NAME, s3_key)
# Cleanup
os.remove(temp_path)
st.success("Upload Complete! EventBridge has triggered the Step Function.")
# 2. POLL DYNAMODB FOR STATUS
st.markdown("### π‘ Job Status")
progress_bar = st.progress(0)
status_text = st.empty()
# Polling Loop
while True:
try:
# Get Item from DynamoDB
response = table.get_item(Key={'video_id': video_id})
except Exception as e:
st.error(f"DB Error: {e}")
break
# Case A: Job hasn't started yet (EventBridge lag)
if 'Item' not in response:
status_text.info("β³ Queued... Waiting for Dispatcher...")
time.sleep(3)
continue
# Case B: Check Status
job = response['Item']
status = job.get('status', 'UNKNOWN')
if status == 'COMPLETED':
progress_bar.progress(100)
status_text.success("β
Analysis Complete! You can now search for this video in Tab 1.")
st.balloons()
break
elif status == 'ANALYZING_STRUCTURE':
progress_bar.progress(10)
status_text.text("π Phase 1: Detecting shots and transcribing audio...")
elif status == 'PROCESSING_PARALLEL':
progress_bar.progress(50)
status_text.text("β‘ Phase 2: Parallel Agents analyzing shots (Distributed Map)...")
elif status == 'FAILED':
status_text.error("β Analysis Failed. Check CloudWatch logs.")
break
# Wait before next poll
time.sleep(5)
- Run the Streamlit app using this command
streamlit run app.py
I downloaded a movie clip from YouTube and tested the process
Uploading Process:
Output:
Based on the input video, I gave this prompt. It worked. Not a perfect one, but it worked. We can always increase the accuracy by changing the ingestion process and the analysis processΒ
Here is the output video
Incase if the gif full not rendering visit this Gdrive link to see the output
output.gif
I am still working on improving accuracy and reducing hallucinations. I am open to suggestions. Please comment your suggestions or inputs.
Thanks for reading.









Top comments (0)