DEV Community

Tanvir Rahman
Tanvir Rahman

Posted on

Automating Harbor Registry Cleanup with Kubernetes

Container registries like Harbor can quickly accumulate unused images, consuming valuable storage space. Manual cleanup is tedious, so this script will safely automates the process while respecting images currently in use by Kubernetes.

The Challenge

After running Harbor in production for several months, we noticed:

  • Storage usage growing exponentially
  • Hundreds of outdated image tags
  • No easy way to identify which images were actually in use
  • Fear of breaking production by deleting important images

Script

#!/usr/bin/env python3
"""
Harbor Image Cleanup Script

This script cleans up old/unused images from Harbor registry while ensuring images
used by Kubernetes deployments are preserved.

Features:
- Connects to Harbor API to list and delete images
- Checks Kubernetes deployments to identify images in use
- Applies configurable retention policies (by age or count)
- Provides detailed logging
- Can be run manually or as a cron job

Requirements:
- Python 3.6+
- Required packages: requests, kubernetes, pyyaml, argparse, logging
"""

import os
import sys
import time
import re
import json
import base64
import logging
import argparse
import datetime
import yaml
import requests
from requests.auth import HTTPBasicAuth
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from typing import Dict, List, Set, Tuple, Optional, Any

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('harbor-cleanup.log')
    ]
)
logger = logging.getLogger('harbor-cleanup')

class HarborClient:
    """Client for interacting with Harbor API"""

    def __init__(self, harbor_url: str, username: str, password: str, verify_ssl: bool = True):
        """
        Initialize Harbor client

        Args:
            harbor_url: URL of Harbor instance (e.g., https://harbor.example.com)
            username: Harbor username
            password: Harbor password
            verify_ssl: Whether to verify SSL certificates
        """
        self.harbor_url = harbor_url.rstrip('/')
        self.api_url = f"{self.harbor_url}/api"
        self.username = username
        self.password = password
        self.verify_ssl = verify_ssl
        self.session = requests.Session()
        self.session.auth = HTTPBasicAuth(username, password)
        self.session.verify = verify_ssl
        # Add X-Xsrftoken header to all requests
        self.session.headers.update({'X-Xsrftoken': 'token'})

        # Test connection
        try:
            self.ping()
            logger.info(f"Successfully connected to Harbor at {harbor_url}")
        except Exception as e:
            logger.error(f"Failed to connect to Harbor: {str(e)}")
            raise

    def ping(self) -> bool:
        """Test connection to Harbor API"""
        response = self.session.get(f"{self.harbor_url}/api/ping")
        response.raise_for_status()
        return True

    def get_projects(self) -> List[Dict]:
        """Get list of projects from Harbor"""
        response = self.session.get(f"{self.harbor_url}/api/projects")
        response.raise_for_status()
        return response.json()

    def get_project_id(self, project_name: str) -> int:
        """
        Get project ID from project name

        Args:
            project_name: Name of the Harbor project

        Returns:
            Project ID as integer

        Raises:
            ValueError: If project not found
        """
        projects = self.get_projects()
        for project in projects:
            if project.get('name') == project_name:
                return project.get('project_id')
        raise ValueError(f"Project {project_name} not found")

    def get_repositories(self, project_name: str) -> List[Dict]:
        """
        Get list of repositories in a project

        Args:
            project_name: Name of the Harbor project

        Returns:
            List of repository objects
        """
        project_id = self.get_project_id(project_name)
        response = self.session.get(f"{self.harbor_url}/api/repositories?project_id={project_id}")
        response.raise_for_status()
        return response.json()

    def get_tags(self, repository_name: str) -> List[Dict]:
        """
        Get list of tags in a repository

        Args:
            repository_name: Full name of repository (project/repo)

        Returns:
            List of tag objects
        """
        # repository_name should be in format "project/repository"
        response = self.session.get(f"{self.harbor_url}/api/repositories/{repository_name}/tags")
        response.raise_for_status()
        return response.json()

    def delete_tag(self, repository_name: str, tag: str) -> bool:
        """
        Delete a tag from Harbor

        Args:
            repository_name: Full name of repository (project/repo)
            tag: Tag to delete

        Returns:
            True if deletion was successful
        """
        response = self.session.delete(
            f"{self.harbor_url}/api/repositories/{repository_name}/tags/{tag}"
        )
        response.raise_for_status()
        logger.info(f"Deleted tag {repository_name}:{tag}")
        return True


class KubernetesClient:
    """Client for interacting with Kubernetes API"""

    def __init__(self, kubeconfig: Optional[str] = None, context: Optional[str] = None):
        """
        Initialize Kubernetes client

        Args:
            kubeconfig: Path to kubeconfig file (defaults to ~/.kube/config)
            context: Kubernetes context to use
        """
        try:
            if kubeconfig:
                config.load_kube_config(kubeconfig, context=context)
            else:
                config.load_kube_config(context=context)
            self.apps_v1 = client.AppsV1Api()
            self.core_v1 = client.CoreV1Api()
            logger.info("Successfully connected to Kubernetes")
        except Exception as e:
            logger.error(f"Failed to connect to Kubernetes: {str(e)}")
            raise

    def get_images_in_use(self) -> Set[str]:
        """
        Get set of all images currently in use in the cluster

        Returns:
            Set of image references (including tags/digests)
        """
        images = set()

        # Check deployments
        try:
            deployments = self.apps_v1.list_deployment_for_all_namespaces()
            for deployment in deployments.items:
                for container in deployment.spec.template.spec.containers:
                    images.add(container.image)
                if deployment.spec.template.spec.init_containers:
                    for container in deployment.spec.template.spec.init_containers:
                        images.add(container.image)
        except ApiException as e:
            logger.error(f"Error getting deployments: {str(e)}")

        # Check statefulsets
        try:
            statefulsets = self.apps_v1.list_stateful_set_for_all_namespaces()
            for statefulset in statefulsets.items:
                for container in statefulset.spec.template.spec.containers:
                    images.add(container.image)
                if statefulset.spec.template.spec.init_containers:
                    for container in statefulset.spec.template.spec.init_containers:
                        images.add(container.image)
        except ApiException as e:
            logger.error(f"Error getting statefulsets: {str(e)}")

        # Check daemonsets
        try:
            daemonsets = self.apps_v1.list_daemon_set_for_all_namespaces()
            for daemonset in daemonsets.items:
                for container in daemonset.spec.template.spec.containers:
                    images.add(container.image)
                if daemonset.spec.template.spec.init_containers:
                    for container in daemonset.spec.template.spec.init_containers:
                        images.add(container.image)
        except ApiException as e:
            logger.error(f"Error getting daemonsets: {str(e)}")

        # Check pods (for CronJobs and Jobs)
        try:
            pods = self.core_v1.list_pod_for_all_namespaces()
            for pod in pods.items:
                for container in pod.spec.containers:
                    images.add(container.image)
                if pod.spec.init_containers:
                    for container in pod.spec.init_containers:
                        images.add(container.image)
        except ApiException as e:
            logger.error(f"Error getting pods: {str(e)}")

        logger.info(f"Found {len(images)} unique images in use in Kubernetes")
        return images


class HarborCleanup:
    """Main class for Harbor cleanup operations"""

    def __init__(
        self,
        harbor_url: str,
        harbor_username: str,
        harbor_password: str,
        kubeconfig: Optional[str] = None,
        kube_context: Optional[str] = None,
        dry_run: bool = False,
        projects: Optional[List[str]] = None,
        exclude_projects: Optional[List[str]] = None,
        keep_days: Optional[int] = None,
        keep_tags: Optional[int] = None,
        skip_in_use: bool = True,
        always_keep_tags: Optional[List[str]] = None,
        verify_ssl: bool = True,
    ):
        """
        Initialize Harbor cleanup

        Args:
            harbor_url: URL of Harbor instance
            harbor_username: Harbor username
            harbor_password: Harbor password
            kubeconfig: Path to kubeconfig file
            kube_context: Kubernetes context to use
            dry_run: If True, don't actually delete anything
            projects: List of projects to clean up (if None, clean all)
            exclude_projects: List of projects to exclude from cleanup
            keep_days: Keep images newer than this many days
            keep_tags: Keep this many most recent tags per repository
            skip_in_use: Skip images in use by Kubernetes
            always_keep_tags: List of tag patterns to always keep (e.g., ['latest', 'stable', 'prod-*'])
            verify_ssl: Whether to verify SSL certificates
        """
        self.harbor_client = HarborClient(harbor_url, harbor_username, harbor_password, verify_ssl)
        self.kube_client = KubernetesClient(kubeconfig, kube_context) if skip_in_use else None
        self.dry_run = dry_run
        self.projects_filter = projects
        self.exclude_projects = exclude_projects or []
        self.keep_days = keep_days
        self.keep_tags = keep_tags
        self.skip_in_use = skip_in_use
        self.always_keep_tags = always_keep_tags or ["latest", "stable", "master"]

        if not self.keep_days and not self.keep_tags:
            logger.warning("No retention policy specified, defaulting to keeping 30 days")
            self.keep_days = 30

        if self.dry_run:
            logger.info("DRY RUN MODE: No images will be deleted")

    def should_keep_tag(self, tag_name: str) -> bool:
        """
        Check if a tag should be kept based on tag patterns

        Args:
            tag_name: Name of the tag

        Returns:
            True if the tag should be kept
        """
        for pattern in self.always_keep_tags:
            if '*' in pattern:
                # Convert glob pattern to regex
                regex_pattern = pattern.replace('.', '\\.').replace('*', '.*')
                if re.match(f"^{regex_pattern}$", tag_name):
                    return True
            elif pattern == tag_name:
                return True
        return False

    def run(self):
        """Run the cleanup process"""
        # Get all images in use by Kubernetes
        k8s_images = self.kube_client.get_images_in_use() if self.skip_in_use else set()

        # Normalize Kubernetes image references to match Harbor format
        normalized_k8s_images = self._normalize_k8s_images(k8s_images)

        # Get projects to clean
        all_projects = self.harbor_client.get_projects()
        projects_to_clean = []

        for project in all_projects:
            project_name = project['name']

            if self.projects_filter and project_name not in self.projects_filter:
                logger.debug(f"Skipping project {project_name} (not in filter)")
                continue

            if project_name in self.exclude_projects:
                logger.debug(f"Skipping project {project_name} (in exclude list)")
                continue

            projects_to_clean.append(project_name)

        logger.info(f"Found {len(projects_to_clean)} projects to clean")

        # Process each project
        for project_name in projects_to_clean:
            self._clean_project(project_name, normalized_k8s_images)

    def _normalize_k8s_images(self, k8s_images: Set[str]) -> Dict[str, Set[str]]:
        """
        Normalize Kubernetes image references to match Harbor format

        Args:
            k8s_images: Set of image references from Kubernetes

        Returns:
            Dict mapping repository names to sets of tags/digests
        """
        normalized = {}
        harbor_domain = self.harbor_client.harbor_url.replace('https://', '').replace('http://', '')

        for image in k8s_images:
            # Skip images not from our Harbor
            if not image.startswith(harbor_domain):
                continue

            # Extract repository and tag/digest
            if '@sha256:' in image:
                repo, digest = image.split('@sha256:', 1)
                digest = f"sha256:{digest}"
                repo = repo.replace(f"{harbor_domain}/", '')

                if repo not in normalized:
                    normalized[repo] = set()
                normalized[repo].add(digest)
            else:
                if ':' in image[image.find('/')+1:]:  # Make sure we're not splitting on the port
                    repo, tag = image.rsplit(':', 1)
                    repo = repo.replace(f"{harbor_domain}/", '')

                    if repo not in normalized:
                        normalized[repo] = set()
                    normalized[repo].add(tag)
                else:
                    # Image without tag (implicitly 'latest')
                    repo = image.replace(f"{harbor_domain}/", '')

                    if repo not in normalized:
                        normalized[repo] = set()
                    normalized[repo].add('latest')

        return normalized

    def _clean_project(self, project_name: str, k8s_images: Dict[str, Set[str]]):
        """
        Clean a single Harbor project

        Args:
            project_name: Name of the Harbor project
            k8s_images: Dict mapping repository names to sets of tags/digests
        """
        logger.info(f"Cleaning project: {project_name}")

        # Get repositories in the project
        try:
            repositories = self.harbor_client.get_repositories(project_name)
        except Exception as e:
            logger.error(f"Error getting repositories for project {project_name}: {str(e)}")
            return

        logger.info(f"Found {len(repositories)} repositories in project {project_name}")

        for repo in repositories:
            repo_name = repo['name']  # Should be in format "project/repo"
            self._clean_repository(repo_name, k8s_images)

    def _clean_repository(self, repo_name: str, k8s_images: Dict[str, Set[str]]):
        """
        Clean a single Harbor repository

        Args:
            repo_name: Full name of the repository (project/repo)
            k8s_images: Dict mapping repository names to sets of tags/digests
        """
        logger.info(f"Cleaning repository: {repo_name}")

        # Get tags in the repository
        try:
            tags = self.harbor_client.get_tags(repo_name)
        except Exception as e:
            logger.error(f"Error getting tags for repository {repo_name}: {str(e)}")
            return

        # Sort tags by creation time (newest first)
        tags.sort(key=lambda x: x.get('created', ''), reverse=True)

        # Keep track of how many tags we've kept
        kept_count = 0

        for i, tag in enumerate(tags):
            tag_name = tag['name']
            created_time = tag.get('created')

            # Check if tag is in use by Kubernetes
            in_use = False
            if self.skip_in_use and repo_name in k8s_images:
                if tag_name in k8s_images[repo_name]:
                    in_use = True
                    logger.info(f"Keeping {repo_name}:{tag_name} (in use by Kubernetes)")

            # Check if we should keep this tag based on retention policies
            should_keep = False

            # Check if tag matches the always-keep patterns
            if self.should_keep_tag(tag_name):
                should_keep = True
                logger.info(f"Keeping {repo_name}:{tag_name} (matches always-keep pattern)")

            # Check age-based retention
            if not should_keep and self.keep_days is not None and created_time:
                created_date = datetime.datetime.fromisoformat(created_time.replace('Z', '+00:00'))
                age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days
                if age_days < self.keep_days:
                    should_keep = True
                    logger.info(f"Keeping {repo_name}:{tag_name} (age: {age_days} days, keep_days: {self.keep_days})")

            # Check count-based retention
            if not should_keep and self.keep_tags is not None:
                if kept_count < self.keep_tags:
                    should_keep = True
                    logger.info(f"Keeping {repo_name}:{tag_name} (keep_tags: {self.keep_tags})")

            # Delete or keep tag
            if in_use or should_keep:
                kept_count += 1
            else:
                logger.info(f"Deleting {repo_name}:{tag_name}")

                if not self.dry_run:
                    try:
                        self.harbor_client.delete_tag(repo_name, tag_name)
                    except Exception as e:
                        logger.error(f"Error deleting tag {repo_name}:{tag_name}: {str(e)}")


def get_harbor_credentials_from_kube() -> Tuple[Optional[str], Optional[str], Optional[str]]:
    """
    Attempt to extract Harbor credentials from Kubernetes context

    Returns:
        Tuple of (harbor_url, username, password)
    """
    # Try to load current context
    try:
        config.load_kube_config()
        v1 = client.CoreV1Api()

        # First, try to find harbor-auth secret in harbor namespace
        try:
            secret = v1.read_namespaced_secret("harbor-auth", "harbor")
            harbor_url = base64.b64decode(secret.data.get("url", "")).decode("utf-8")
            username = base64.b64decode(secret.data.get("username", "")).decode("utf-8")
            password = base64.b64decode(secret.data.get("password", "")).decode("utf-8")
            return harbor_url, username, password
        except:
            pass

        # Try to find any Harbor credentials in any namespace
        namespaces = v1.list_namespace()
        for ns in namespaces.items:
            ns_name = ns.metadata.name
            try:
                secrets = v1.list_namespaced_secret(ns_name)
                for secret in secrets.items:
                    if "harbor" in secret.metadata.name.lower():
                        data = secret.data
                        if data:
                            # Try to find url, username, password
                            harbor_url = None
                            username = None
                            password = None

                            for key in data:
                                value = base64.b64decode(data[key]).decode("utf-8")
                                key_lower = key.lower()

                                if "url" in key_lower or "host" in key_lower:
                                    harbor_url = value
                                elif "user" in key_lower:
                                    username = value
                                elif "pass" in key_lower:
                                    password = value

                            if harbor_url and username and password:
                                return harbor_url, username, password
            except:
                continue
    except:
        pass

    return None, None, None


def parse_args():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(
        description="Clean up old/unused images from Harbor registry"
    )

    # Harbor connection options
    harbor_group = parser.add_argument_group("Harbor Connection Options")
    harbor_group.add_argument(
        "--harbor-url", help="Harbor URL (e.g., https://harbor.example.com)"
    )
    harbor_group.add_argument("--harbor-username", help="Harbor username")
    harbor_group.add_argument("--harbor-password", help="Harbor password")
    harbor_group.add_argument(
        "--no-verify-ssl", action="store_true", help="Skip SSL certificate verification"
    )

    # Kubernetes options
    k8s_group = parser.add_argument_group("Kubernetes Options")
    k8s_group.add_argument("--kubeconfig", help="Path to kubeconfig file")
    k8s_group.add_argument("--kube-context", help="Kubernetes context to use")
    k8s_group.add_argument(
        "--no-skip-in-use", action="store_true", 
        help="Don't skip images in use by Kubernetes"
    )

    # Cleanup options
    cleanup_group = parser.add_argument_group("Cleanup Options")
    cleanup_group.add_argument(
        "--dry-run", action="store_true", help="Don't actually delete anything"
    )
    cleanup_group.add_argument(
        "--projects", nargs="*", help="Projects to clean up (if not specified, clean all)"
    )
    cleanup_group.add_argument(
        "--exclude-projects", nargs="*", help="Projects to exclude from cleanup"
    )
    cleanup_group.add_argument(
        "--keep-days", type=int, help="Keep images newer than this many days"
    )
    cleanup_group.add_argument(
        "--keep-tags", type=int, help="Keep this many most recent tags per repository"
    )
    cleanup_group.add_argument(
        "--always-keep-tags", nargs="*", 
        default=["latest", "stable", "master"],
        help="Tag patterns to always keep (supports wildcards, e.g., 'prod-*')"
    )

    # Other options
    parser.add_argument(
        "--config", help="Path to YAML config file (overrides command line options)"
    )
    parser.add_argument(
        "--use-kube-auth", action="store_true",
        help="Extract Harbor credentials from Kubernetes secrets"
    )
    parser.add_argument(
        "--verbose", "-v", action="count", default=0, help="Increase verbosity"
    )

    args = parser.parse_args()

    # Handle verbosity
    if args.verbose == 1:
        logger.setLevel(logging.INFO)
    elif args.verbose >= 2:
        logger.setLevel(logging.DEBUG)

    # Load config file if specified
    if args.config:
        try:
            with open(args.config, 'r') as f:
                config_data = yaml.safe_load(f)

            # Update args with config file values
            for key, value in config_data.items():
                if value is not None:
                    setattr(args, key.replace('-', '_'), value)
        except Exception as e:
            logger.error(f"Error loading config file: {str(e)}")

    # Try to extract Harbor credentials from Kubernetes if requested
    if args.use_kube_auth and not (args.harbor_url and args.harbor_username and args.harbor_password):
        harbor_url, username, password = get_harbor_credentials_from_kube()
        if harbor_url and username and password:
            logger.info("Successfully extracted Harbor credentials from Kubernetes")
            args.harbor_url = args.harbor_url or harbor_url
            args.harbor_username = args.harbor_username or username
            args.harbor_password = args.harbor_password or password
        else:
            logger.warning("Failed to extract Harbor credentials from Kubernetes")

    # Validate required options
    if not args.harbor_url:
        parser.error("Harbor URL is required")
    if not args.harbor_username:
        parser.error("Harbor username is required")
    if not args.harbor_password:
        parser.error("Harbor password is required")

    return args


def main():
    """Main entry point"""
    args = parse_args()

    cleanup = HarborCleanup(
        harbor_url=args.harbor_url,
        harbor_username=args.harbor_username,
        harbor_password=args.harbor_password,
        kubeconfig=args.kubeconfig,
        kube_context=args.kube_context,
        dry_run=args.dry_run,
        projects=args.projects,
        exclude_projects=args.exclude_projects,
        keep_days=args.keep_days,
        keep_tags=args.keep_tags,
        skip_in_use=not args.no_skip_in_use,
        always_keep_tags=args.always_keep_tags,
        verify_ssl=not args.no_verify_ssl,
    )

    try:
        cleanup.run()
        logger.info("Cleanup completed successfully")
    except KeyboardInterrupt:
        logger.info("Cleanup interrupted by user")
        sys.exit(1)
    except Exception as e:
        logger.error(f"Cleanup failed: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

The Solution

A Python script that:

  1. Integrates with Harbor's API
  2. Checks Kubernetes for active deployments
  3. Applies configurable retention policies
  4. Provides safety mechanisms like dry-run mode

Key Features

# Sample configuration showing main features
cleanup = HarborCleanup(
    harbor_url="https://harbor.example.com",
    harbor_username="admin",
    harbor_password="secret",
    keep_days=30,          # Keep images newer than 30 days
    keep_tags=5,           # Keep 5 most recent tags per repo
    skip_in_use=True,      # Don't delete images used by Kubernetes
    always_keep_tags=[     # Protected tag patterns
        "latest",
        "stable",
        "prod-*"
    ],
    dry_run=True           # Safety first!
)
Enter fullscreen mode Exit fullscreen mode

Safety Mechanisms

  • Dry Run Mode - Preview deletions without actually removing anything
./harbor-cleanup.py --dry-run --keep-days 30
Enter fullscreen mode Exit fullscreen mode
  • Kubernetes Integration - Automatically detects in-use images
# Gets images from:
# - Deployments
# - StatefulSets
# - DaemonSets
# - Pods (for Jobs/CronJobs)
k8s_images = kube_client.get_images_in_use()
Enter fullscreen mode Exit fullscreen mode
  • Protected Tags - Never delete important tags like latest or prod-*

  • Dual Retention Policies - Combine age-based and count-based rules

Getting Started

  • Install dependencies:
pip3 install pyyaml kubernetes requests
Enter fullscreen mode Exit fullscreen mode
  • Run a test dry-run:
./harbor-cleanup.py \
  --use-kube-auth \
  --dry-run \
  --keep-days 30 \
  --keep-tags 5 \
  --always-keep-tags latest stable master "prod-*"
Enter fullscreen mode Exit fullscreen mode
  • For production run (after verifying dry-run):
./harbor-cleanup.py \
  --harbor-url https://harbor.example.com \
  --harbor-username $HARBOR_USER \
  --harbor-password $HARBOR_PASS \
  --keep-days 30 \
  --keep-tags 5
Enter fullscreen mode Exit fullscreen mode

Advanced Usage

Configuration File

Instead of command-line args, use a YAML config:

# config.yaml
harbor-url: https://harbor.example.com
harbor-username: admin
keep-days: 30
keep-tags: 5
exclude-projects:
  - infrastructure
  - legacy
Enter fullscreen mode Exit fullscreen mode

Then run with:

./harbor-cleanup.py --config config.yaml
Enter fullscreen mode Exit fullscreen mode

The script has helped us reduce Harbor storage usage by 70% while maintaining all production-critical images. The Kubernetes integration gives us confidence we won't break running applications.

Would love to hear about your registry cleanup experiences in the comments!

Top comments (0)