DEV Community

Cover image for How I Built a Drone-Based Crack Detection Pipeline on AWS
Roberto Belotti
Roberto Belotti

Posted on

How I Built a Drone-Based Crack Detection Pipeline on AWS

I needed to build a pipeline that takes drone footage of infrastructure (bridges, facades, roads), detects surface defects like cracks and corrosion, and delivers actionable reports to engineers who don't care about ML.

Sounds straightforward. It wasn't.

This article walks through the architecture decisions, the Python code that ties it all together, and the lessons I learned about what happens when computer vision meets real-world AWS constraints.

The problem (and why it's not a model problem)

Let me be clear upfront: this project is not about training a state-of-the-art detection model. YOLOv8 with a pre-trained checkpoint gets you 90%+ accuracy on structural defects out of the box. The hard part is everything else.

When a drone lands after a 15-minute inspection flight, you have:

  • Hundreds of high-resolution images (4K, 8-12 MB each)
  • GPS metadata embedded in EXIF
  • No guarantee of consistent lighting, angle, or overlap
  • An engineer waiting for a report, not a folder of annotated JPEGs

The real engineering challenge is the pipeline: ingest, process, store, report. And the architecture decision that shapes everything else: where does inference run?

Edge vs cloud vs hybrid (the decision that changes everything)

I explored three options before writing a single line of code.

Option A: Full edge inference. Run YOLO on a Jetson Nano strapped to the drone. Process frames in real-time, store results on an SD card, download after landing. Pros: zero connectivity dependency, immediate triage. Cons: 5W power budget, thermal throttling at altitude, model size limited to what fits on 4GB RAM. And you only see results when the drone is back on the ground.

Option B: Full cloud inference. Upload raw frames to S3, trigger a Lambda (or Fargate task) to run detection, store results in DynamoDB. Pros: unlimited compute, easy to swap models, centralized results. Cons: you need connectivity during or after flight, and processing 500 images at 8MB each means moving ~4GB to the cloud before anything happens.

Option C: Hybrid (the one I built). Lightweight triage on-device flags "interesting" frames during flight. After landing, the full-resolution flagged images get uploaded to S3 and processed by a beefier model in the cloud. Best of both: fast triage, accurate detection, no wasted bandwidth on clear sky shots.

For this project I went with a simplified version of Option B (cloud-only), because the primary use case is batch processing of post-flight image dumps. The edge component is a future iteration.

Architecture overview


Three S3 prefixes, one DynamoDB table, one processing function. No orchestrator, no step function. Deliberately simple.

Project structure

drone-defect-detector/
├── src/
│   ├── __init__.py
│   ├── detector.py          # YOLOv8 inference wrapper
│   ├── pipeline.py          # Orchestrates ingest → detect → report
│   ├── report.py            # Generates annotated images + summary
│   ├── s3.py                # S3 upload/download helpers
│   └── models.py            # Pydantic models for detections
├── lambda/
│   └── handler.py           # Lambda entry point
├── tests/
│   ├── test_detector.py
│   ├── test_pipeline.py
│   └── conftest.py
├── Dockerfile
├── pyproject.toml
└── README.md
Enter fullscreen mode Exit fullscreen mode

The detection wrapper

The first thing I built was a thin wrapper around Ultralytics YOLOv8. The goal: isolate the ML dependency behind a clean interface so the rest of the pipeline doesn't care what model runs underneath.

# src/detector.py
from dataclasses import dataclass
from pathlib import Path

import cv2
import numpy as np
from ultralytics import YOLO


@dataclass(frozen=True)
class Detection:
    """A single detected defect."""
    label: str
    confidence: float
    bbox: tuple[int, int, int, int]  # x1, y1, x2, y2
    area_px: int


class DefectDetector:
    """Wraps YOLOv8 for structural defect detection."""

    # Defect classes we care about (COCO-pretrained as baseline,
    # swap with a fine-tuned checkpoint for production)
    DEFECT_CLASSES = {"crack", "corrosion", "spalling", "delamination"}

    def __init__(
        self,
        model_path: str = "yolov8n.pt",
        confidence_threshold: float = 0.4,
        device: str = "cpu",
    ) -> None:
        self._model = YOLO(model_path)
        self._conf_threshold = confidence_threshold
        self._device = device

    def detect(self, image_path: Path) -> list[Detection]:
        """Run inference on a single image. Returns detected defects."""
        results = self._model.predict(
            source=str(image_path),
            conf=self._conf_threshold,
            device=self._device,
            verbose=False,
        )

        detections: list[Detection] = []
        for result in results:
            for box in result.boxes:
                label = result.names[int(box.cls)]
                if label not in self.DEFECT_CLASSES:
                    continue

                x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                detections.append(
                    Detection(
                        label=label,
                        confidence=float(box.conf),
                        bbox=(x1, y1, x2, y2),
                        area_px=(x2 - x1) * (y2 - y1),
                    )
                )

        return detections

    def annotate(
        self, image_path: Path, detections: list[Detection]
    ) -> np.ndarray:
        """Draw bounding boxes on the image. Returns annotated frame."""
        img = cv2.imread(str(image_path))

        colors = {
            "crack": (0, 0, 255),       # red
            "corrosion": (0, 165, 255),  # orange
            "spalling": (0, 255, 255),   # yellow
            "delamination": (255, 0, 0), # blue
        }

        for det in detections:
            color = colors.get(det.label, (0, 255, 0))
            x1, y1, x2, y2 = det.bbox
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)

            text = f"{det.label} {det.confidence:.0%}"
            (tw, th), _ = cv2.getTextSize(
                text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1
            )
            cv2.rectangle(
                img, (x1, y1 - th - 8), (x1 + tw + 4, y1), color, -1
            )
            cv2.putText(
                img, text, (x1 + 2, y1 - 4),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1,
            )

        return img
Enter fullscreen mode Exit fullscreen mode

A few notes on this:

Why yolov8n.pt (nano)? Because this runs inside a Lambda or a lightweight Fargate container. The nano variant is 6MB and runs inference in ~50ms on CPU. For a batch pipeline where you're processing hundreds of images post-flight, that's fast enough. If you need better accuracy on fine-grained defect types, swap in a fine-tuned checkpoint (the interface doesn't change).

Why filter by DEFECT_CLASSES? The COCO-pretrained model detects 80 classes. We only care about structural defects. In production, you'd use a model fine-tuned on a crack/corrosion dataset (like the RDD2022 road damage dataset), but the architecture is identical.

Why frozen=True on the dataclass? Detections are immutable facts. Once you've detected a crack at coordinates (x1, y1, x2, y2) with confidence 0.87, that shouldn't change downstream.

The pipeline: from S3 event to report

The pipeline orchestrates the full flow: download images from S3, run detection, generate annotated outputs, upload results.

# src/pipeline.py
import json
import logging
from pathlib import Path
from datetime import datetime, timezone

from .detector import DefectDetector, Detection
from .report import ReportGenerator
from .s3 import S3Client

logger = logging.getLogger(__name__)


class InspectionPipeline:
    """End-to-end: S3 download → detection → annotation → upload."""

    def __init__(
        self,
        bucket: str,
        model_path: str = "yolov8n.pt",
        confidence_threshold: float = 0.4,
    ) -> None:
        self._bucket = bucket
        self._detector = DefectDetector(
            model_path=model_path,
            confidence_threshold=confidence_threshold,
        )
        self._s3 = S3Client(bucket)
        self._report = ReportGenerator()

    def process_inspection(
        self, inspection_id: str, raw_prefix: str, work_dir: Path
    ) -> dict:
        """Process all images in an S3 prefix. Returns summary."""
        images_dir = work_dir / "images"
        output_dir = work_dir / "output"
        images_dir.mkdir(parents=True, exist_ok=True)
        output_dir.mkdir(parents=True, exist_ok=True)

        # 1. Download raw images
        image_keys = self._s3.list_images(raw_prefix)
        logger.info(
            "Inspection %s: found %d images", inspection_id, len(image_keys)
        )

        local_paths = []
        for key in image_keys:
            local_path = images_dir / Path(key).name
            self._s3.download(key, local_path)
            local_paths.append(local_path)

        # 2. Run detection on each image
        all_results: dict[str, list[Detection]] = {}
        total_defects = 0

        for path in local_paths:
            detections = self._detector.detect(path)
            all_results[path.name] = detections
            total_defects += len(detections)

            if detections:
                # Generate annotated image
                annotated = self._detector.annotate(path, detections)
                annotated_path = output_dir / f"annotated_{path.name}"
                import cv2
                cv2.imwrite(str(annotated_path), annotated)

                # Upload annotated image
                self._s3.upload(
                    annotated_path,
                    f"annotated/{inspection_id}/{annotated_path.name}",
                )

            logger.info(
                "  %s: %d defects found", path.name, len(detections)
            )

        # 3. Generate summary report
        summary = self._build_summary(inspection_id, all_results)
        report_path = output_dir / f"report_{inspection_id}.json"
        report_path.write_text(json.dumps(summary, indent=2))

        self._s3.upload(
            report_path,
            f"reports/{inspection_id}/report.json",
        )

        # 4. Generate visual report (matplotlib)
        chart_path = self._report.generate_charts(
            summary, output_dir / f"charts_{inspection_id}.png"
        )
        self._s3.upload(
            chart_path,
            f"reports/{inspection_id}/charts.png",
        )

        logger.info(
            "Inspection %s complete: %d images, %d defects",
            inspection_id,
            len(local_paths),
            total_defects,
        )

        return summary

    def _build_summary(
        self,
        inspection_id: str,
        results: dict[str, list[Detection]],
    ) -> dict:
        """Build a structured summary from detection results."""
        defect_counts: dict[str, int] = {}
        high_severity: list[dict] = []

        for filename, detections in results.items():
            for det in detections:
                defect_counts[det.label] = (
                    defect_counts.get(det.label, 0) + 1
                )
                if det.confidence >= 0.75:
                    high_severity.append(
                        {
                            "file": filename,
                            "label": det.label,
                            "confidence": round(det.confidence, 3),
                            "bbox": det.bbox,
                            "area_px": det.area_px,
                        }
                    )

        return {
            "inspection_id": inspection_id,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "total_images": len(results),
            "images_with_defects": sum(
                1 for dets in results.values() if dets
            ),
            "total_defects": sum(len(d) for d in results.values()),
            "defect_counts": defect_counts,
            "high_severity_detections": high_severity,
        }
Enter fullscreen mode Exit fullscreen mode

The report generator

Engineers don't want JSON. They want a chart that says "this bridge has 14 cracks, mostly on the north face, and 3 of them are high-confidence."

# src/report.py
from pathlib import Path

import matplotlib
matplotlib.use("Agg")  # non-interactive backend (Lambda has no display)
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker


class ReportGenerator:
    """Generates visual reports from inspection summaries."""

    def generate_charts(self, summary: dict, output_path: Path) -> Path:
        """Create a summary chart with defect distribution."""
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        fig.suptitle(
            f"Inspection Report — {summary['inspection_id']}",
            fontsize=14,
            fontweight="bold",
        )

        # Chart 1: Defect counts by type
        defect_counts = summary.get("defect_counts", {})
        if defect_counts:
            labels = list(defect_counts.keys())
            values = list(defect_counts.values())
            colors = self._get_colors(labels)

            bars = axes[0].barh(labels, values, color=colors)
            axes[0].set_xlabel("Count")
            axes[0].set_title("Defects by Type")
            axes[0].xaxis.set_major_locator(
                ticker.MaxNLocator(integer=True)
            )

            for bar, val in zip(bars, values):
                axes[0].text(
                    bar.get_width() + 0.2, bar.get_y() + bar.get_height() / 2,
                    str(val), va="center", fontsize=10,
                )
        else:
            axes[0].text(
                0.5, 0.5, "No defects detected",
                ha="center", va="center", transform=axes[0].transAxes,
            )

        # Chart 2: Coverage overview
        total = summary["total_images"]
        with_defects = summary["images_with_defects"]
        clean = total - with_defects

        axes[1].pie(
            [with_defects, clean],
            labels=["With defects", "Clean"],
            autopct="%1.0f%%",
            colors=["#e74c3c", "#2ecc71"],
            startangle=90,
        )
        axes[1].set_title(
            f"Image Coverage ({total} images)"
        )

        plt.tight_layout()
        plt.savefig(output_path, dpi=150, bbox_inches="tight")
        plt.close()

        return output_path

    @staticmethod
    def _get_colors(labels: list[str]) -> list[str]:
        color_map = {
            "crack": "#e74c3c",
            "corrosion": "#e67e22",
            "spalling": "#f1c40f",
            "delamination": "#3498db",
        }
        return [color_map.get(label, "#95a5a6") for label in labels]
Enter fullscreen mode Exit fullscreen mode

S3 helpers (the boring part that matters)

# src/s3.py
import logging
from pathlib import Path

import boto3
from botocore.config import Config

logger = logging.getLogger(__name__)

IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tiff", ".bmp"}


class S3Client:
    """Thin wrapper around boto3 S3 operations."""

    def __init__(self, bucket: str) -> None:
        self._bucket = bucket
        self._client = boto3.client(
            "s3",
            config=Config(
                retries={"max_attempts": 3, "mode": "adaptive"}
            ),
        )

    def list_images(self, prefix: str) -> list[str]:
        """List image keys under a prefix."""
        paginator = self._client.get_paginator("list_objects_v2")
        keys: list[str] = []

        for page in paginator.paginate(
            Bucket=self._bucket, Prefix=prefix
        ):
            for obj in page.get("Contents", []):
                if Path(obj["Key"]).suffix.lower() in IMAGE_EXTENSIONS:
                    keys.append(obj["Key"])

        return sorted(keys)

    def download(self, key: str, local_path: Path) -> None:
        """Download a single object to a local file."""
        logger.debug("Downloading s3://%s/%s", self._bucket, key)
        self._client.download_file(self._bucket, key, str(local_path))

    def upload(self, local_path: Path, key: str) -> None:
        """Upload a local file to S3."""
        content_type = self._guess_content_type(local_path)
        logger.debug("Uploading to s3://%s/%s", self._bucket, key)
        self._client.upload_file(
            str(local_path),
            self._bucket,
            key,
            ExtraArgs={"ContentType": content_type},
        )

    @staticmethod
    def _guess_content_type(path: Path) -> str:
        mapping = {
            ".json": "application/json",
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
        }
        return mapping.get(path.suffix.lower(), "application/octet-stream")
Enter fullscreen mode Exit fullscreen mode

Lambda handler

The glue that connects S3 events to the pipeline. When images land in the raw/ prefix, this fires.

# lambda/handler.py
import json
import logging
import os
import tempfile
from pathlib import Path
from urllib.parse import unquote_plus

from src.pipeline import InspectionPipeline

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event: dict, context) -> dict:
    """Lambda entry point. Triggered by S3 PutObject events."""
    bucket = os.environ["BUCKET_NAME"]
    model_path = os.environ.get("MODEL_PATH", "yolov8n.pt")
    confidence = float(os.environ.get("CONFIDENCE_THRESHOLD", "0.4"))

    pipeline = InspectionPipeline(
        bucket=bucket,
        model_path=model_path,
        confidence_threshold=confidence,
    )

    # Extract inspection ID from the S3 key
    # Expected format: raw/{inspection_id}/image_001.jpg
    records = event.get("Records", [])
    processed_inspections = set()

    for record in records:
        key = unquote_plus(record["s3"]["object"]["key"])
        parts = key.split("/")

        if len(parts) < 3 or parts[0] != "raw":
            logger.warning("Unexpected key format: %s", key)
            continue

        inspection_id = parts[1]
        if inspection_id in processed_inspections:
            continue

        with tempfile.TemporaryDirectory() as tmp:
            summary = pipeline.process_inspection(
                inspection_id=inspection_id,
                raw_prefix=f"raw/{inspection_id}/",
                work_dir=Path(tmp),
            )

        processed_inspections.add(inspection_id)
        logger.info(
            "Processed inspection %s: %s",
            inspection_id,
            json.dumps(summary, default=str),
        )

    return {
        "statusCode": 200,
        "body": json.dumps(
            {"processed": list(processed_inspections)}
        ),
    }
Enter fullscreen mode Exit fullscreen mode

Why Lambda might be wrong here (and what I'd use instead)

This is the part where I second-guess my own architecture.

Lambda with a container image works for small inspections (50-100 images). But it has hard limits:

  • 15-minute timeout. Processing 500 high-res images with YOLOv8 nano on CPU takes ~25 seconds of pure inference, but add S3 downloads, annotation, uploads, and report generation, and you're looking at 3-5 minutes. Doable, but tight for larger inspections.
  • 10 GB ephemeral storage. 500 images at 8 MB each = 4 GB just for the raw files. Add annotated outputs and you're flirting with the limit.
  • No GPU. Lambda doesn't support GPU instances. YOLOv8 nano on CPU is fine, but if you want to run a larger model (yolov8m, yolov8l) for better accuracy, you need Fargate with GPU or a SageMaker endpoint.

For production at scale, I'd swap the detection Lambda for a Fargate task with a GPU-enabled instance. The trigger stays the same (S3 event → SQS → Fargate), but you get configurable timeout, more storage, and GPU access.

The pipeline code doesn't change at all. That's the whole point of keeping the infrastructure concerns (Lambda handler, S3 events) separate from the business logic (detector, pipeline, report).

Dockerfile

FROM public.ecr.aws/lambda/python:3.12

# System deps for OpenCV
RUN dnf install -y mesa-libGL && dnf clean all

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ${LAMBDA_TASK_ROOT}/src/
COPY lambda/ ${LAMBDA_TASK_ROOT}/

CMD ["handler.handler"]
Enter fullscreen mode Exit fullscreen mode
# requirements.txt
ultralytics>=8.2.0,<9.0.0
opencv-python-headless>=4.9.0,<5.0.0
boto3>=1.34.0,<2.0.0
matplotlib>=3.8.0,<4.0.0
pydantic>=2.6.0,<3.0.0
Enter fullscreen mode Exit fullscreen mode

Note: opencv-python-headless, not opencv-python. The headless variant doesn't pull in Qt/GTK dependencies, which saves ~200 MB in the container image and avoids display-related errors in Lambda.

Testing with moto (no AWS account required)

# tests/conftest.py
import pytest
import boto3
from moto import mock_aws


@pytest.fixture
def aws_credentials(monkeypatch):
    monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
    monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
    monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
    monkeypatch.setenv("AWS_DEFAULT_REGION", "eu-central-1")


@pytest.fixture
def s3_bucket(aws_credentials):
    with mock_aws():
        client = boto3.client("s3", region_name="eu-central-1")
        client.create_bucket(
            Bucket="test-inspection",
            CreateBucketConfiguration={
                "LocationConstraint": "eu-central-1"
            },
        )
        yield "test-inspection"
Enter fullscreen mode Exit fullscreen mode
# tests/test_detector.py
from pathlib import Path
import numpy as np
import cv2
import pytest

from src.detector import DefectDetector


@pytest.fixture
def sample_image(tmp_path: Path) -> Path:
    """Create a synthetic test image with a dark line (simulated crack)."""
    img = np.ones((640, 640, 3), dtype=np.uint8) * 200  # gray background
    cv2.line(img, (100, 100), (400, 300), (30, 30, 30), 3)  # dark line
    path = tmp_path / "test_crack.jpg"
    cv2.imwrite(str(path), img)
    return path


def test_detector_returns_list(sample_image: Path):
    detector = DefectDetector(confidence_threshold=0.1)
    results = detector.detect(sample_image)
    assert isinstance(results, list)


def test_detector_annotate_preserves_dimensions(sample_image: Path):
    detector = DefectDetector(confidence_threshold=0.1)
    detections = detector.detect(sample_image)
    annotated = detector.annotate(sample_image, detections)
    original = cv2.imread(str(sample_image))
    assert annotated.shape == original.shape
Enter fullscreen mode Exit fullscreen mode

Lessons learned

1. The model is the easiest part. I spent maybe 10% of my time on inference code and 90% on the pipeline around it: S3 key conventions, error handling, report formatting, container packaging. If you're building a CV pipeline and you think the hard part is the model, you haven't started the hard part yet.

2. Separate infrastructure from logic. The DefectDetector class doesn't know about S3, Lambda, or AWS. The InspectionPipeline doesn't know about Lambda events. The handler is just glue. This means I can run the exact same pipeline locally (python -m src.pipeline) for testing, or swap the Lambda trigger for Fargate without touching any business logic.

3. opencv-python-headless saves headaches. I lost an hour debugging an import error in Lambda because the full OpenCV package tried to load libGL.so. The headless variant just works. Always use it in server/serverless environments.

4. S3 key conventions are your schema. raw/{inspection_id}/, annotated/{inspection_id}/, reports/{inspection_id}/. Simple, predictable, greppable. No database needed to track which inspection produced which outputs.

5. Confidence thresholds are a product decision, not a technical one. Setting the threshold at 0.4 catches more potential defects but generates more false positives. Setting it at 0.8 is more precise but misses borderline cracks. The right value depends on whether your users prefer "flag everything, I'll triage manually" or "only show me what you're sure about." I made it configurable via environment variable and let the ops team decide.

What's next

  • Edge triage module: a lightweight ONNX model running on-device that flags frames worth uploading, reducing bandwidth by 60-70%
  • GPS overlay: extract EXIF GPS data and map defect locations on a geo-referenced grid
  • Severity scoring: use defect area (in pixels, relative to image resolution) as a proxy for physical size, and flag anything above a threshold

The code is on GitHub: github.com/biscolab/drone-defect-detector


I write about cloud architecture, AI in production, and the engineering decisions nobody puts in the README. Follow me on LinkedIn for the short version.

Top comments (0)