DEV Community

Cover image for Build a Calorie Estimation App With Vision Agents
Emmanuel Aiyenigba
Emmanuel Aiyenigba

Posted on

Build a Calorie Estimation App With Vision Agents

Introduction

Knowing the calorie count in the food you consume daily can help you maintain a healthy diet. But, manual calorie counting can be difficult, especially for foods without labels. With computer vision, we can now automate calorie estimation for any food.

In this tutorial, we’ll build an agent that estimates and tracks your total calorie consumption throughout the day. We will leverage computer vision through the Vision Agents SDK for this. By the end, you’ll be able to point your camera at a portion of food, and the agent will estimate portion size and calories.

Vision Agents (explore the code on GitHub) is a framework that enables developers to build real-time voice and video AI applications using any LLM and video edge network. The Vision Agents framework combines a series of other frameworks under the hood (e.g., processor, LLM, video edge network, etc.) to help you build real-time AI applications in simple steps.

Let’s get started.

Here’s What We’re Building

In our calorie estimation app, the agent will:

  • Analyze the food on camera and identify it.
  • Estimate the portion size based on the area of the bounding box.
  • Get the food’s calorie information via the USDA (United States Department of Agriculture) FoodData API and provide the user with the nutritional information.
  • Store the calorie information in an Excel sheet. The user can later reference it to add up their total calorie consumption for the day.

At the end of the tutorial, here is what our calories estimation app will look like:

You can reference GitHub for the full code.

Prerequisites

To follow along with this tutorial, you will need:

  • A free Stream account. We’ll use Stream as our video edge network.
  • Some knowledge of Python.
  • uv package installer.

Setting Up Your Application

Let’s start by initializing our application using uv:

uv init
Enter fullscreen mode Exit fullscreen mode

Add the packages that we need:

uv add "vision-agents[getstream, cartesia, deepgram, openai]" python-dotenv httpx ultralytics openpyxl
Enter fullscreen mode Exit fullscreen mode

We’ll use Stream as our video edge network, Cartesia for text-to-speech, Deepgram for speech-to-text, OpenAI as our LLM, Ultralytics for object detection, httpx for HTTP requests, and openpyxl for Excel sheets.

Next step, in our main.py file, let’s import the necessary packages for our project:

import asyncio
import logging
import sys
import os
from uuid import uuid4
from typing import Dict, List, Any
import httpx
from dotenv import load_dotenv
from vision_agents.plugins import getstream, openai, deepgram, cartesia
from vision_agents.core import User, Agent
from datetime import datetime
from openpyxl import Workbook, load_workbook
Enter fullscreen mode Exit fullscreen mode

We’ll come back to this file shortly to implement our agent. In the meantime, let’s proceed with implementing the processor.

Processing Video Frames for Agent Analysis

For our agent to detect what food we are pointing the camera at, we need to implement a processor. Processors in Vision Agents analyze and transform audio and video streams in real-time and pass the frames to the agent for further use.

In our case, we want to process video frames as PIL images, extract food detections, and then use the detections in our agent to determine the nutritional information of the food.

Let’s create a food_detection.py file for this:

food_detection.py

from PIL import Image
from vision_agents.core.processors import AudioVideoProcessor, ImageProcessorMixin
from ultralytics import YOLO
import logging
import time
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)


class FoodDetectionProcessor(AudioVideoProcessor, ImageProcessorMixin):
    """
    Detects food items in video frames using YOLO.
    """

    def __init__(
        self,
        model_path: str = "yolov8n.pt",
        conf_threshold: float = 0.2,
        interval: int = 1,
        detection_timeout: float = 3.0,
    ):
        """
        Initialize food detection processor.

        Args:
            model_path: Path to YOLO model (default: yolov8n.pt)
            conf_threshold: Confidence threshold for detections
            interval: Process every N seconds (default: 1)
            detection_timeout: Keep last valid detections for this many seconds (default: 3.0)
        """
        super().__init__(
            interval=interval,
            receive_audio=False,
            receive_video=True,
            send_audio=False,
            send_video=False,
        )

        logger.info(f"Initializing FoodDetectionProcessor")
        logger.info(f"  Model: {model_path}")
        logger.info(f"  Confidence threshold: {conf_threshold}")
        logger.info(f"  Processing interval: {interval}s")
        logger.info(f"  Detection timeout: {detection_timeout}s")

        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold

        # COCO food-related classes
        self.food_classes = [
            40,  # wine glass
            41,  # cup
            42,  # fork
            43,  # knife
            44,  # spoon
            45,  # bowl
            46,  # banana
            47,  # apple
            48,  # sandwich
            49,  # orange
            50,  # broccoli
            51,  # carrot
            52,  # hot dog
            53,  # pizza
            54,  # donut
            55,  # cake
            59,  # potted plant
            61,  # dining table
        ]

        self.latest_detections: List[Dict[str, Any]] = []
        self.last_detection_time: float = 0
        self.detection_timeout = detection_timeout
        self.frame_count = 0

        logger.info("Food detection model loaded successfully")

    async def process_image(
        self,
        image: Image.Image,
        user_id: str,
        metadata: Optional[dict] = None,
    ):
        """
        Process individual video frames as PIL Images.

        This method is called automatically by the agent for each video frame.
        """
        if image.size[0] == 0 or image.size[1] == 0:
            logger.warning("Skipping invalid frame (zero size)")
            return

        if not self.should_process():
            logger.debug("Skipping frame due to processing interval")
            return

        self.frame_count += 1
        logger.debug(f" Processing frame #{self.frame_count} from user {user_id} (size: {image.size})")

        try:
            # Run YOLO detection
            results = self.model(
                image,
                conf=self.conf_threshold,
                classes=self.food_classes,
                verbose=False,
            )

            detections = self._extract_detections(results)

            if detections:
                self.latest_detections = detections
                self.last_detection_time = time.time()
                detected_items = [
                    f"{d['class_name']} ({d['confidence']:.2f})" for d in detections
                ]
                logger.info(f" Detected {len(detections)} items: {', '.join(detected_items)}")
            else:
                logger.debug("No food items detected in this frame")

        except Exception as e:
            logger.error(f"Error processing frame: {e}", exc_info=True)

    def _extract_detections(self, results) -> List[Dict[str, Any]]:
        """Extract detection information from YOLO results."""
        detections = []

        if not results or len(results) == 0:
            return detections

        result = results[0]
        if result.boxes is None or len(result.boxes) == 0:
            return detections

        boxes = result.boxes
        for i in range(len(boxes)):
            box = boxes.xyxy[i].cpu().numpy()
            conf = float(boxes.conf[i].cpu().numpy())
            cls_id = int(boxes.cls[i].cpu().numpy())
            class_name = result.names.get(cls_id, f"class_{cls_id}")

            bbox = [float(x) for x in box]
            width = bbox[2] - bbox[0]
            height = bbox[3] - bbox[1]
            area = width * height

            detections.append({
                "bbox": bbox,
                "bbox_area": area,
                "confidence": conf,
                "class_id": cls_id,
                "class_name": class_name,
            })

        return detections

    def get_latest_detections(self) -> List[Dict[str, Any]]:
        """
        Get the most recent food detections, but only if they are not stale.
        """
        if time.time() - self.last_detection_time <= self.detection_timeout:
            return self.latest_detections.copy()
        return []  # Return empty if detections are stale

    def reset(self):
        """Reset detection state at the start of a new session."""
        self.latest_detections = []
        self.last_detection_time = 0
        self.frame_count = 0
        logger.info(" FoodDetectionProcessor state reset")
Enter fullscreen mode Exit fullscreen mode

Our FoodDetectionProcessor implements AudioVideoProcessor and ImageProcessorMixin to receive and process video frames in real-time. The processor receives video frames from Stream and uses the process_image() method to process video frames as PIL images every second.

Our processor class runs the YOLOv8 object detection model and uses the food classes of the COCO dataset to detect food objects in the frames.

Note: The food classes of the COCO dataset are limited, so our processor can only detect a few food objects (e.g, apple, banana, sandwich, broccoli, etc). For production purposes, you should choose a dataset with a lot more food-specific annotations than COCO.

Once food is detected, the bounding box coordinates, bounding box area, confidence scores, class IDs, and class names are extracted. We will need this data later for our agent.

get_latest_detections() returns recent detections if they are not stale, and reset() clears the detection history for a new session.

Now that we have our food detection processor working, let’s go back to the main.py file and build our nutrition analysis pipeline.

Building the Nutrition Analysis Pipeline

For our calorie estimation app to function well, we need to integrate nutrition analysis into our pipeline.

Once food is detected, we want to estimate the portion size and then search for calorie information from the USDA FoodData Central API.

Let’s go ahead and do that.

main.py


# USDA FoodData Central API configuration
USDA_API_KEY = os.getenv("USDA_API_KEY")  # Get your free key at https://fdc.nal.usda.gov/api-key-signup.html
USDA_API_BASE = "https://api.nal.usda.gov/fdc/v1"



async def search_food_calories(food_name: str, portion_grams: float = 100.0) -> Dict[str, Any]:
    """
    Search for food calorie information using the USDA FoodData Central API.
    """
    try:
        async with httpx.AsyncClient() as client:
            search_url = f"{USDA_API_BASE}/foods/search"
            params = {
                "api_key": USDA_API_KEY,
                "query": food_name,
                "pageSize": 1,
                "dataType": ["Survey (FNDDS)", "Foundation", "SR Legacy"]
            }

            response = await client.get(search_url, params=params, timeout=10.0)
            response.raise_for_status()
            data = response.json()

            if not data.get("foods"):
                logger.warning(f"No nutritional data found for: {food_name}")
                return {
                    "food_name": food_name,
                    "calories": "Unknown",
                    "error": "Food not found in database"
                }

            food_item = data["foods"][0]
            nutrients = {n["nutrientName"]: n["value"] for n in food_item.get("foodNutrients", [])}

            calories_per_100g = nutrients.get("Energy", 0)
            calories_for_portion = (calories_per_100g * portion_grams) / 100

            result = {
                "food_name": food_item.get("description", food_name),
                "portion_grams": portion_grams,
                "calories": round(calories_for_portion, 1),
                "protein_g": round(nutrients.get("Protein", 0) * portion_grams / 100, 1),
                "carbs_g": round(nutrients.get("Carbohydrate, by difference", 0) * portion_grams / 100, 1),
                "fat_g": round(nutrients.get("Total lipid (fat)", 0) * portion_grams / 100, 1),
            }

            logger.info(f"Found nutritional data for {food_name}: {result}")
            return result

    except Exception as e:
        logger.error(f"Error fetching calorie data for {food_name}: {e}")
        return {
            "food_name": food_name,
            "calories": "Unknown",
            "error": str(e)
        }


def estimate_portion_size(bbox_area: float, reference_object: str = "plate") -> Dict[str, Any]:
    """
    Estimate portion size based on bounding box area.
    """
    portion_multiplier = bbox_area / 50000  # Normalized to standard plate
    estimated_weight_grams = portion_multiplier * 150
    estimated_volume_ml = portion_multiplier * 200

    size_description = "small"
    if portion_multiplier > 1.5:
        size_description = "large"
    elif portion_multiplier > 0.8:
        size_description = "medium"

    result = {
        "estimated_weight_grams": round(estimated_weight_grams, 1),
        "estimated_volume_ml": round(estimated_volume_ml, 1),
        "size_description": size_description,
        "reference": reference_object
    }

    logger.info(f"Estimated portion size: {result}")
    return result


async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
    """
    Analyze detected foods and calculate total calories.
    """
    if not food_detections:
        return "No food items detected in the frame."

    results = []
    total_calories = 0

    for detection in food_detections:
        food_name = detection.get("class_name", "unknown food")
        bbox = detection.get("bbox", [0, 0, 0, 0])
        confidence = detection.get("confidence", 0)

        bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        portion_info = estimate_portion_size(bbox_area)
        calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])

        # log calorie information to excel spreadsheet
        log_food_to_excel(calorie_info, user_id="demo_user")

        if isinstance(calorie_info.get("calories"), (int, float)):
            total_calories += calorie_info["calories"]

        results.append({
            "food": food_name,
            "confidence": f"{confidence * 100:.1f}%",
            "portion": portion_info,
            "nutrition": calorie_info
        })

    response_parts = [f"I detected {len(food_detections)} food item(s):\n"]

    for i, result in enumerate(results, 1):
        nutrition = result["nutrition"]
        portion = result["portion"]

        response_parts.append(
            f"{i}. {result['food'].title()} "
            f"({portion['size_description']} portion, ~{portion['estimated_weight_grams']}g): "
        )

        if isinstance(nutrition.get("calories"), (int, float)):
            response_parts.append(
                f"{nutrition['calories']} calories, "
                f"{nutrition.get('protein_g', 0)}g protein, "
                f"{nutrition.get('carbs_g', 0)}g carbs, "
                f"{nutrition.get('fat_g', 0)}g fat\n"
            )
        else:
            response_parts.append("Nutritional data unavailable\n")

    if total_calories > 0:
        response_parts.append(f"\nTotal estimated calories: {round(total_calories)} kcal")

    return "".join(response_parts)

Enter fullscreen mode Exit fullscreen mode

In the above code, we have three functions that essentially fetch nutritional data, estimate the food portion using its bounding box area, and implement the food and calorie analysis pipeline, respectively.

  • search_food_calories() fetches nutritional data from the USDA FoodData Central API for the detected food and returns a structured dictionary with dietary values.
  • estimate_portion_size() estimates the weight, volume, and portion size category of the detected food. It uses the bounding box area to estimate portion size.
  • analyze_food_and_calories() calls both functions and returns a readable summary describing the detected food, estimated portion, and calorie information.

Implementing Our Calorie Estimation Agent

Now that we have all the necessary functions and processor in place, it’s time to put everything together and bring our Agent to life.

Let’s set up the video application and implement our Agent.

main.py

from food_detection import FoodDetectionProcessor



async def start_agent() -> None:
    """Initialize and start the video AI agent with food detection."""
    try:
        logger.info("=" * 60)
        logger.info("Starting Food Detection Agent")
        logger.info("=" * 60)

        food_processor = FoodDetectionProcessor(
            model_path="yolov8n.pt",
            conf_threshold=0.2,
            interval=1
        )

        llm = openai.LLM(model="gpt-4o-mini")

        @llm.register_function(
            description="Analyzes food items currently visible in the video feed and provides detailed nutritional information including calories, portion size, and macronutrient"
        )
        async def analyze_food() -> str:
            """
            Analyzes detected food items from the video feed.
            Waits up to 1.5 seconds for a detection if none is immediately available.
            """
            try:
                for _ in range(15):  # 15 * 0.1s = 1.5s max wait
                    detections = food_processor.get_latest_detections()
                    if detections:
                        items = [f"{d['class_name']} ({d['confidence']:.2f})" for d in detections]
                        logger.info(f" [TOOL] Detected: {', '.join(items)}")
                        result = await analyze_food_and_calories(detections)
                        logger.info(f"Analysis result: {result}")
                        return result
                    await asyncio.sleep(0.1)

                logger.warning("[TOOL] No detections found after 1.5s wait")
                return "I'm still not seeing any food clearly. Please point your camera at some food and ensure it's well-lit!"

            except Exception as e:
                logger.error(f"Error in analyze_food tool: {e}", exc_info=True)
                return f"Sorry, I had trouble analyzing the food. Please try again."

        agent = Agent(
            edge=getstream.Edge(),
            agent_user=User(name="Food Detection Assistant", id="agent"),
            instructions=(
                "You're a friendly food portion and calorie estimator. "
                "When you detect food items in the video, use the analyze_food tool to get detailed "
                "nutritional information including portion size and calories. "
                "Present the information in a natural, conversational way. "
                "Be encouraging and helpful. Keep responses concise and easy to understand."
            ),
            llm=llm,
            stt=deepgram.STT(),
            tts=cartesia.TTS(),
            processors=[food_processor]
        )


        if agent.image_processors:
            logger.info(f"  Image processor type: {type(agent.image_processors[0]).__name__}")
        logger.info("=" * 60)

        call_id = str(uuid4())

        await agent.create_user()
        call = agent.edge.client.video.call("default", call_id)
        await agent.edge.open_demo(call)

        await asyncio.sleep(5)
        session = await agent.join(call)

        with session:
            food_processor.reset()
            frames_seen = False
              # Wait for first frame to be processed (up to 10 seconds)
            for _ in range(50):  # 50 * 0.2s = 10s max
                if food_processor.frame_count > 0:
                    logger.info("Video stream confirmed. Ready for food detection!")
                    frames_seen = True
                    break
                await asyncio.sleep(0.5)
            if not frames_seen:
                logger.warning(" No video frames after 10s. Proceeding anyway.")


            await agent.llm.simple_response(
                "Hello! I'm your food detection assistant. "
                "Point your camera at some food and ask me what I see!"
            )

            await agent.finish()

    except KeyboardInterrupt:
        logger.info("Agent stopped by user")
    except Exception as e:
        logger.error(f"Error running agent: {e}", exc_info=True)
        raise
    finally:
        logger.info("Agent shutting down...")


def main() -> None:
    """Main entry point."""
    try:
        asyncio.run(start_agent())
    except KeyboardInterrupt:
        logger.info("Application interrupted by user")
        sys.exit(0)
    except Exception as e:
        logger.error(f"Application failed: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

We initialized a FoodDetectionProcessor instance to detect food items in video frames using YOLOv8. The analyze_food function is registered with the LLM. The function waits for about 1.5 seconds for the processor to detect food items in the video feed. It then calls analyze_food_and_calories() to compute nutritional information.

Our agent is configured with LLM, STT, TTS, and our FoodDetectionProcessor. Once the processor identifies a food item in the video, it passes the information to the Agent, which then calls analyze_food to get the nutritional information for the food. We’re using Stream as our video edge client because its low-latency streaming delivers high-quality video performance, perfect for real-time food detection.

This is what our .env file looks like:

# Stream credentials - Get yours at: https://getstream.io/dashboard/
STREAM_API_KEY=your-stream-key
STREAM_API_SECRET=your-stream-secret

OPENAI_API_KEY=your-openai-key


USDA_API_KEY=your-USDA-key
DEEPGRAM_API_KEY=your-deepgram-key
CARTESIA_API_KEY=your-cartesia-key
Enter fullscreen mode Exit fullscreen mode

At this point, our Calorie Estimation App is nearly ready to go! 🎉 You can test it by running the command below:

uv run main.py
Enter fullscreen mode Exit fullscreen mode

Next, let’s add one final touch. We will log the calorie information in an Excel file for easy reference by the user.

Logging Calorie Information in an Excel File

For record keeping and easy referencing, let’s log the food calorie information in an Excel file.

def log_food_to_excel(food_result: Dict[str, Any], user_id: str = "user"):
    """
    Append the calorie analysis result to an Excel (.xlsx) log file.

    """
    file_exists = os.path.isfile(FOOD_LOG_FILE)

    if file_exists:
        wb = load_workbook(FOOD_LOG_FILE)
        ws = wb.active
    else:
        wb = Workbook()
        ws = wb.active
        ws.append([
            "Timestamp",
            "User ID",
            "Food Name",
            "Portion (g)",
            "Calories",
            "Protein (g)",
            "Carbs (g)",
            "Fat (g)",
            "Error"
        ])
        # Optional: auto-fit columns (basic)
        for col in ws.columns:
            max_length = 0
            column = col[0].column_letter
            for cell in col:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            ws.column_dimensions[column].width = adjusted_width

    # Append data row
    ws.append([
        datetime.now().isoformat(),
        user_id,
        food_result.get("food_name", "Unknown"),
        food_result.get("portion_grams", "N/A"),
        food_result.get("calories", "Unknown"),
        food_result.get("protein_g", "N/A"),
        food_result.get("carbs_g", "N/A"),
        food_result.get("fat_g", "N/A"),
        food_result.get("error", "")
    ])

    # Save file
    wb.save(FOOD_LOG_FILE)

Enter fullscreen mode Exit fullscreen mode

We are using openpxyl to create or update an Excel sheet with the calorie information of the food.

Let’s call the log_food_to_excel function inside analyze_food_and_calories().

 async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
        calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])

        # log calorie information to excel spreadsheet
        log_food_to_excel(calorie_info, user_id="demo_user")

Enter fullscreen mode Exit fullscreen mode

For our demonstration purposes, openpyxl works perfectly fine. However, in a production environment, it is better to use a real database, such as Supabase, to store the calorie information and export to an Excel sheet when needed. This is because openpyxl does not support true concurrent writes, which can cause issues when multiple users attempt to log data simultaneously.

Here’s how you could implement this using Supabase:

In your Supabase SQL Editor, run the query below:

CREATE TABLE food_log (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  created_at TIMESTAMPTZ DEFAULT NOW(),
  user_id TEXT NOT NULL,
  food_name TEXT,
  portion_grams REAL,
  calories REAL,
  protein_g REAL,
  carbs_g REAL,
  fat_g REAL,
  error TEXT
);

Enter fullscreen mode Exit fullscreen mode

This creates a food_log table with the necessary columns.

Next, install Supabase:

uv add supabase
Enter fullscreen mode Exit fullscreen mode

Then, import it into your project:

from supabase import create_client, Client
Enter fullscreen mode Exit fullscreen mode

Add your Supabase credentials to the .env file:

SUPABASE_URL=your-project-url.supabase.co
SUPABASE_ANON_KEY=your-anon-key
Enter fullscreen mode Exit fullscreen mode

Add a function to save calorie information in Supabase:


def log_food_to_supabase(food_result: Dict[str, Any], user_id: str = "demo_user"):
    """
    Log food analysis to Supabase database.
    """
    try:
        supabase: Client = create_client(
            os.getenv("SUPABASE_URL"),
            os.getenv("SUPABASE_ANON_KEY")
        )

        data = {
            "user_id": user_id,
            "food_name": food_result.get("food_name", "Unknown"),
            "portion_grams": food_result.get("portion_grams"),
            "calories": food_result.get("calories") if isinstance(food_result.get("calories"), (int, float)) else None,
            "protein_g": food_result.get("protein_g"),
            "carbs_g": food_result.get("carbs_g"),
            "fat_g": food_result.get("fat_g"),
            "error": food_result.get("error", "")
        }

        response = supabase.table("food_log").insert(data).execute()
        logger.info(f"Food logged to Supabase: {response.data[0]['id']}")
    except Exception as e:
        logger.error(f" Failed to log to Supabase: {e}")

Enter fullscreen mode Exit fullscreen mode

Call the function inside the analyze_food_and_calories function, right after calorie_info:

async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:

        calorie_info = await search_food_calories(food_name, portion_info["estimated_weight_grams"])

        # log calorie information to Supabase
    log_food_to_supabase(calorie_info, user_id="demo_user")  # use real user_id

Enter fullscreen mode Exit fullscreen mode

You can then provide a download link on demand via Supabase Edge Function or client-side export.

Summary

In this tutorial, we built a calorie estimation and tracker app using Vision Agents. We started by implementing a processor to receive frames from the Stream video edge network, detect food objects, and then pass them to the agent.

Next, we built our nutrition analysis pipeline so that once food is detected in the camera, we can look up the calorie information from the USDA FoodData Central API. The agent retrieves the calorie information of the food by calling the function registered to it.

Finally, we added an Excel sheet to store the calorie information for reference.

🛠️ Want to keep building with Vision Agents? Explore these tutorials next:

Top comments (0)