Introduction
Knowing the calorie count in the food you consume daily can help you maintain a healthy diet. But, manual calorie counting can be difficult, especially for foods without labels. With computer vision, we can now automate calorie estimation for any food.
In this tutorial, we’ll build an agent that estimates and tracks your total calorie consumption throughout the day. We will leverage computer vision through the Vision Agents SDK for this. By the end, you’ll be able to point your camera at a portion of food, and the agent will estimate portion size and calories.
Vision Agents (explore the code on GitHub) is a framework that enables developers to build real-time voice and video AI applications using any LLM and video edge network. The Vision Agents framework combines a series of other frameworks under the hood (e.g., processor, LLM, video edge network, etc.) to help you build real-time AI applications in simple steps.
Let’s get started.
Here’s What We’re Building
In our calorie estimation app, the agent will:
- Analyze the food on camera and identify it.
- Estimate the portion size based on the area of the bounding box.
- Get the food’s calorie information via the USDA (United States Department of Agriculture) FoodData API and provide the user with the nutritional information.
- Store the calorie information in an Excel sheet. The user can later reference it to add up their total calorie consumption for the day.
At the end of the tutorial, here is what our calories estimation app will look like:
You can reference GitHub for the full code.
Prerequisites
To follow along with this tutorial, you will need:
- A free Stream account. We’ll use Stream as our video edge network.
- Some knowledge of Python.
-
uvpackage installer.
Setting Up Your Application
Let’s start by initializing our application using uv:
uv init
Add the packages that we need:
uv add "vision-agents[getstream, cartesia, deepgram, openai]" python-dotenv httpx ultralytics openpyxl
We’ll use Stream as our video edge network, Cartesia for text-to-speech, Deepgram for speech-to-text, OpenAI as our LLM, Ultralytics for object detection, httpx for HTTP requests, and openpyxl for Excel sheets.
Next step, in our main.py file, let’s import the necessary packages for our project:
import asyncio
import logging
import sys
import os
from uuid import uuid4
from typing import Dict, List, Any
import httpx
from dotenv import load_dotenv
from vision_agents.plugins import getstream, openai, deepgram, cartesia
from vision_agents.core import User, Agent
from datetime import datetime
from openpyxl import Workbook, load_workbook
We’ll come back to this file shortly to implement our agent. In the meantime, let’s proceed with implementing the processor.
Processing Video Frames for Agent Analysis
For our agent to detect what food we are pointing the camera at, we need to implement a processor. Processors in Vision Agents analyze and transform audio and video streams in real-time and pass the frames to the agent for further use.
In our case, we want to process video frames as PIL images, extract food detections, and then use the detections in our agent to determine the nutritional information of the food.
Let’s create a food_detection.py file for this:
food_detection.py
from PIL import Image
from vision_agents.core.processors import AudioVideoProcessor, ImageProcessorMixin
from ultralytics import YOLO
import logging
import time
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class FoodDetectionProcessor(AudioVideoProcessor, ImageProcessorMixin):
"""
Detects food items in video frames using YOLO.
"""
def __init__(
self,
model_path: str = "yolov8n.pt",
conf_threshold: float = 0.2,
interval: int = 1,
detection_timeout: float = 3.0,
):
"""
Initialize food detection processor.
Args:
model_path: Path to YOLO model (default: yolov8n.pt)
conf_threshold: Confidence threshold for detections
interval: Process every N seconds (default: 1)
detection_timeout: Keep last valid detections for this many seconds (default: 3.0)
"""
super().__init__(
interval=interval,
receive_audio=False,
receive_video=True,
send_audio=False,
send_video=False,
)
logger.info(f"Initializing FoodDetectionProcessor")
logger.info(f" Model: {model_path}")
logger.info(f" Confidence threshold: {conf_threshold}")
logger.info(f" Processing interval: {interval}s")
logger.info(f" Detection timeout: {detection_timeout}s")
self.model = YOLO(model_path)
self.conf_threshold = conf_threshold
# COCO food-related classes
self.food_classes = [
40, # wine glass
41, # cup
42, # fork
43, # knife
44, # spoon
45, # bowl
46, # banana
47, # apple
48, # sandwich
49, # orange
50, # broccoli
51, # carrot
52, # hot dog
53, # pizza
54, # donut
55, # cake
59, # potted plant
61, # dining table
]
self.latest_detections: List[Dict[str, Any]] = []
self.last_detection_time: float = 0
self.detection_timeout = detection_timeout
self.frame_count = 0
logger.info("Food detection model loaded successfully")
async def process_image(
self,
image: Image.Image,
user_id: str,
metadata: Optional[dict] = None,
):
"""
Process individual video frames as PIL Images.
This method is called automatically by the agent for each video frame.
"""
if image.size[0] == 0 or image.size[1] == 0:
logger.warning("Skipping invalid frame (zero size)")
return
if not self.should_process():
logger.debug("Skipping frame due to processing interval")
return
self.frame_count += 1
logger.debug(f" Processing frame #{self.frame_count} from user {user_id} (size: {image.size})")
try:
# Run YOLO detection
results = self.model(
image,
conf=self.conf_threshold,
classes=self.food_classes,
verbose=False,
)
detections = self._extract_detections(results)
if detections:
self.latest_detections = detections
self.last_detection_time = time.time()
detected_items = [
f"{d['class_name']} ({d['confidence']:.2f})" for d in detections
]
logger.info(f" Detected {len(detections)} items: {', '.join(detected_items)}")
else:
logger.debug("No food items detected in this frame")
except Exception as e:
logger.error(f"Error processing frame: {e}", exc_info=True)
def _extract_detections(self, results) -> List[Dict[str, Any]]:
"""Extract detection information from YOLO results."""
detections = []
if not results or len(results) == 0:
return detections
result = results[0]
if result.boxes is None or len(result.boxes) == 0:
return detections
boxes = result.boxes
for i in range(len(boxes)):
box = boxes.xyxy[i].cpu().numpy()
conf = float(boxes.conf[i].cpu().numpy())
cls_id = int(boxes.cls[i].cpu().numpy())
class_name = result.names.get(cls_id, f"class_{cls_id}")
bbox = [float(x) for x in box]
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
area = width * height
detections.append({
"bbox": bbox,
"bbox_area": area,
"confidence": conf,
"class_id": cls_id,
"class_name": class_name,
})
return detections
def get_latest_detections(self) -> List[Dict[str, Any]]:
"""
Get the most recent food detections, but only if they are not stale.
"""
if time.time() - self.last_detection_time <= self.detection_timeout:
return self.latest_detections.copy()
return [] # Return empty if detections are stale
def reset(self):
"""Reset detection state at the start of a new session."""
self.latest_detections = []
self.last_detection_time = 0
self.frame_count = 0
logger.info(" FoodDetectionProcessor state reset")
Our FoodDetectionProcessor implements AudioVideoProcessor and ImageProcessorMixin to receive and process video frames in real-time. The processor receives video frames from Stream and uses the process_image() method to process video frames as PIL images every second.
Our processor class runs the YOLOv8 object detection model and uses the food classes of the COCO dataset to detect food objects in the frames.
Note: The food classes of the COCO dataset are limited, so our processor can only detect a few food objects (e.g, apple, banana, sandwich, broccoli, etc). For production purposes, you should choose a dataset with a lot more food-specific annotations than COCO.
Once food is detected, the bounding box coordinates, bounding box area, confidence scores, class IDs, and class names are extracted. We will need this data later for our agent.
get_latest_detections() returns recent detections if they are not stale, and reset() clears the detection history for a new session.
Now that we have our food detection processor working, let’s go back to the main.py file and build our nutrition analysis pipeline.
Building the Nutrition Analysis Pipeline
For our calorie estimation app to function well, we need to integrate nutrition analysis into our pipeline.
Once food is detected, we want to estimate the portion size and then search for calorie information from the USDA FoodData Central API.
Let’s go ahead and do that.
main.py
# USDA FoodData Central API configuration
USDA_API_KEY = os.getenv("USDA_API_KEY") # Get your free key at https://fdc.nal.usda.gov/api-key-signup.html
USDA_API_BASE = "https://api.nal.usda.gov/fdc/v1"
async def search_food_calories(food_name: str, portion_grams: float = 100.0) -> Dict[str, Any]:
"""
Search for food calorie information using the USDA FoodData Central API.
"""
try:
async with httpx.AsyncClient() as client:
search_url = f"{USDA_API_BASE}/foods/search"
params = {
"api_key": USDA_API_KEY,
"query": food_name,
"pageSize": 1,
"dataType": ["Survey (FNDDS)", "Foundation", "SR Legacy"]
}
response = await client.get(search_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
if not data.get("foods"):
logger.warning(f"No nutritional data found for: {food_name}")
return {
"food_name": food_name,
"calories": "Unknown",
"error": "Food not found in database"
}
food_item = data["foods"][0]
nutrients = {n["nutrientName"]: n["value"] for n in food_item.get("foodNutrients", [])}
calories_per_100g = nutrients.get("Energy", 0)
calories_for_portion = (calories_per_100g * portion_grams) / 100
result = {
"food_name": food_item.get("description", food_name),
"portion_grams": portion_grams,
"calories": round(calories_for_portion, 1),
"protein_g": round(nutrients.get("Protein", 0) * portion_grams / 100, 1),
"carbs_g": round(nutrients.get("Carbohydrate, by difference", 0) * portion_grams / 100, 1),
"fat_g": round(nutrients.get("Total lipid (fat)", 0) * portion_grams / 100, 1),
}
logger.info(f"Found nutritional data for {food_name}: {result}")
return result
except Exception as e:
logger.error(f"Error fetching calorie data for {food_name}: {e}")
return {
"food_name": food_name,
"calories": "Unknown",
"error": str(e)
}
def estimate_portion_size(bbox_area: float, reference_object: str = "plate") -> Dict[str, Any]:
"""
Estimate portion size based on bounding box area.
"""
portion_multiplier = bbox_area / 50000 # Normalized to standard plate
estimated_weight_grams = portion_multiplier * 150
estimated_volume_ml = portion_multiplier * 200
size_description = "small"
if portion_multiplier > 1.5:
size_description = "large"
elif portion_multiplier > 0.8:
size_description = "medium"
result = {
"estimated_weight_grams": round(estimated_weight_grams, 1),
"estimated_volume_ml": round(estimated_volume_ml, 1),
"size_description": size_description,
"reference": reference_object
}
logger.info(f"Estimated portion size: {result}")
return result
async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
"""
Analyze detected foods and calculate total calories.
"""
if not food_detections:
return "No food items detected in the frame."
results = []
total_calories = 0
for detection in food_detections:
food_name = detection.get("class_name", "unknown food")
bbox = detection.get("bbox", [0, 0, 0, 0])
confidence = detection.get("confidence", 0)
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
portion_info = estimate_portion_size(bbox_area)
calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])
# log calorie information to excel spreadsheet
log_food_to_excel(calorie_info, user_id="demo_user")
if isinstance(calorie_info.get("calories"), (int, float)):
total_calories += calorie_info["calories"]
results.append({
"food": food_name,
"confidence": f"{confidence * 100:.1f}%",
"portion": portion_info,
"nutrition": calorie_info
})
response_parts = [f"I detected {len(food_detections)} food item(s):\n"]
for i, result in enumerate(results, 1):
nutrition = result["nutrition"]
portion = result["portion"]
response_parts.append(
f"{i}. {result['food'].title()} "
f"({portion['size_description']} portion, ~{portion['estimated_weight_grams']}g): "
)
if isinstance(nutrition.get("calories"), (int, float)):
response_parts.append(
f"{nutrition['calories']} calories, "
f"{nutrition.get('protein_g', 0)}g protein, "
f"{nutrition.get('carbs_g', 0)}g carbs, "
f"{nutrition.get('fat_g', 0)}g fat\n"
)
else:
response_parts.append("Nutritional data unavailable\n")
if total_calories > 0:
response_parts.append(f"\nTotal estimated calories: {round(total_calories)} kcal")
return "".join(response_parts)
In the above code, we have three functions that essentially fetch nutritional data, estimate the food portion using its bounding box area, and implement the food and calorie analysis pipeline, respectively.
-
search_food_calories()fetches nutritional data from the USDA FoodData Central API for the detected food and returns a structured dictionary with dietary values. -
estimate_portion_size()estimates the weight, volume, and portion size category of the detected food. It uses the bounding box area to estimate portion size. -
analyze_food_and_calories()calls both functions and returns a readable summary describing the detected food, estimated portion, and calorie information.
Implementing Our Calorie Estimation Agent
Now that we have all the necessary functions and processor in place, it’s time to put everything together and bring our Agent to life.
Let’s set up the video application and implement our Agent.
main.py
from food_detection import FoodDetectionProcessor
async def start_agent() -> None:
"""Initialize and start the video AI agent with food detection."""
try:
logger.info("=" * 60)
logger.info("Starting Food Detection Agent")
logger.info("=" * 60)
food_processor = FoodDetectionProcessor(
model_path="yolov8n.pt",
conf_threshold=0.2,
interval=1
)
llm = openai.LLM(model="gpt-4o-mini")
@llm.register_function(
description="Analyzes food items currently visible in the video feed and provides detailed nutritional information including calories, portion size, and macronutrient"
)
async def analyze_food() -> str:
"""
Analyzes detected food items from the video feed.
Waits up to 1.5 seconds for a detection if none is immediately available.
"""
try:
for _ in range(15): # 15 * 0.1s = 1.5s max wait
detections = food_processor.get_latest_detections()
if detections:
items = [f"{d['class_name']} ({d['confidence']:.2f})" for d in detections]
logger.info(f" [TOOL] Detected: {', '.join(items)}")
result = await analyze_food_and_calories(detections)
logger.info(f"Analysis result: {result}")
return result
await asyncio.sleep(0.1)
logger.warning("[TOOL] No detections found after 1.5s wait")
return "I'm still not seeing any food clearly. Please point your camera at some food and ensure it's well-lit!"
except Exception as e:
logger.error(f"Error in analyze_food tool: {e}", exc_info=True)
return f"Sorry, I had trouble analyzing the food. Please try again."
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Food Detection Assistant", id="agent"),
instructions=(
"You're a friendly food portion and calorie estimator. "
"When you detect food items in the video, use the analyze_food tool to get detailed "
"nutritional information including portion size and calories. "
"Present the information in a natural, conversational way. "
"Be encouraging and helpful. Keep responses concise and easy to understand."
),
llm=llm,
stt=deepgram.STT(),
tts=cartesia.TTS(),
processors=[food_processor]
)
if agent.image_processors:
logger.info(f" Image processor type: {type(agent.image_processors[0]).__name__}")
logger.info("=" * 60)
call_id = str(uuid4())
await agent.create_user()
call = agent.edge.client.video.call("default", call_id)
await agent.edge.open_demo(call)
await asyncio.sleep(5)
session = await agent.join(call)
with session:
food_processor.reset()
frames_seen = False
# Wait for first frame to be processed (up to 10 seconds)
for _ in range(50): # 50 * 0.2s = 10s max
if food_processor.frame_count > 0:
logger.info("Video stream confirmed. Ready for food detection!")
frames_seen = True
break
await asyncio.sleep(0.5)
if not frames_seen:
logger.warning(" No video frames after 10s. Proceeding anyway.")
await agent.llm.simple_response(
"Hello! I'm your food detection assistant. "
"Point your camera at some food and ask me what I see!"
)
await agent.finish()
except KeyboardInterrupt:
logger.info("Agent stopped by user")
except Exception as e:
logger.error(f"Error running agent: {e}", exc_info=True)
raise
finally:
logger.info("Agent shutting down...")
def main() -> None:
"""Main entry point."""
try:
asyncio.run(start_agent())
except KeyboardInterrupt:
logger.info("Application interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Application failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
We initialized a FoodDetectionProcessor instance to detect food items in video frames using YOLOv8. The analyze_food function is registered with the LLM. The function waits for about 1.5 seconds for the processor to detect food items in the video feed. It then calls analyze_food_and_calories() to compute nutritional information.
Our agent is configured with LLM, STT, TTS, and our FoodDetectionProcessor. Once the processor identifies a food item in the video, it passes the information to the Agent, which then calls analyze_food to get the nutritional information for the food. We’re using Stream as our video edge client because its low-latency streaming delivers high-quality video performance, perfect for real-time food detection.
This is what our .env file looks like:
# Stream credentials - Get yours at: https://getstream.io/dashboard/
STREAM_API_KEY=your-stream-key
STREAM_API_SECRET=your-stream-secret
OPENAI_API_KEY=your-openai-key
USDA_API_KEY=your-USDA-key
DEEPGRAM_API_KEY=your-deepgram-key
CARTESIA_API_KEY=your-cartesia-key
At this point, our Calorie Estimation App is nearly ready to go! 🎉 You can test it by running the command below:
uv run main.py
Next, let’s add one final touch. We will log the calorie information in an Excel file for easy reference by the user.
Logging Calorie Information in an Excel File
For record keeping and easy referencing, let’s log the food calorie information in an Excel file.
def log_food_to_excel(food_result: Dict[str, Any], user_id: str = "user"):
"""
Append the calorie analysis result to an Excel (.xlsx) log file.
"""
file_exists = os.path.isfile(FOOD_LOG_FILE)
if file_exists:
wb = load_workbook(FOOD_LOG_FILE)
ws = wb.active
else:
wb = Workbook()
ws = wb.active
ws.append([
"Timestamp",
"User ID",
"Food Name",
"Portion (g)",
"Calories",
"Protein (g)",
"Carbs (g)",
"Fat (g)",
"Error"
])
# Optional: auto-fit columns (basic)
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column].width = adjusted_width
# Append data row
ws.append([
datetime.now().isoformat(),
user_id,
food_result.get("food_name", "Unknown"),
food_result.get("portion_grams", "N/A"),
food_result.get("calories", "Unknown"),
food_result.get("protein_g", "N/A"),
food_result.get("carbs_g", "N/A"),
food_result.get("fat_g", "N/A"),
food_result.get("error", "")
])
# Save file
wb.save(FOOD_LOG_FILE)
We are using openpxyl to create or update an Excel sheet with the calorie information of the food.
Let’s call the log_food_to_excel function inside analyze_food_and_calories().
async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])
# log calorie information to excel spreadsheet
log_food_to_excel(calorie_info, user_id="demo_user")
For our demonstration purposes, openpyxl works perfectly fine. However, in a production environment, it is better to use a real database, such as Supabase, to store the calorie information and export to an Excel sheet when needed. This is because openpyxl does not support true concurrent writes, which can cause issues when multiple users attempt to log data simultaneously.
Here’s how you could implement this using Supabase:
In your Supabase SQL Editor, run the query below:
CREATE TABLE food_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
created_at TIMESTAMPTZ DEFAULT NOW(),
user_id TEXT NOT NULL,
food_name TEXT,
portion_grams REAL,
calories REAL,
protein_g REAL,
carbs_g REAL,
fat_g REAL,
error TEXT
);
This creates a food_log table with the necessary columns.
Next, install Supabase:
uv add supabase
Then, import it into your project:
from supabase import create_client, Client
Add your Supabase credentials to the .env file:
SUPABASE_URL=your-project-url.supabase.co
SUPABASE_ANON_KEY=your-anon-key
Add a function to save calorie information in Supabase:
def log_food_to_supabase(food_result: Dict[str, Any], user_id: str = "demo_user"):
"""
Log food analysis to Supabase database.
"""
try:
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_ANON_KEY")
)
data = {
"user_id": user_id,
"food_name": food_result.get("food_name", "Unknown"),
"portion_grams": food_result.get("portion_grams"),
"calories": food_result.get("calories") if isinstance(food_result.get("calories"), (int, float)) else None,
"protein_g": food_result.get("protein_g"),
"carbs_g": food_result.get("carbs_g"),
"fat_g": food_result.get("fat_g"),
"error": food_result.get("error", "")
}
response = supabase.table("food_log").insert(data).execute()
logger.info(f"Food logged to Supabase: {response.data[0]['id']}")
except Exception as e:
logger.error(f" Failed to log to Supabase: {e}")
Call the function inside the analyze_food_and_calories function, right after calorie_info:
async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])
# log calorie information to Supabase
log_food_to_supabase(calorie_info, user_id="demo_user") # use real user_id
You can then provide a download link on demand via Supabase Edge Function or client-side export.
Summary
In this tutorial, we built a calorie estimation and tracker app using Vision Agents. We started by implementing a processor to receive frames from the Stream video edge network, detect food objects, and then pass them to the agent.
Next, we built our nutrition analysis pipeline so that once food is detected in the camera, we can look up the calorie information from the USDA FoodData Central API. The agent retrieves the calorie information of the food by calling the function registered to it.
Finally, we added an Excel sheet to store the calorie information for reference.
🛠️ Want to keep building with Vision Agents? Explore these tutorials next:
Top comments (0)