Container registries like Harbor can quickly accumulate unused images, consuming valuable storage space. Manual cleanup is tedious, so this script will safely automates the process while respecting images currently in use by Kubernetes.
The Challenge
After running Harbor in production for several months, we noticed:
- Storage usage growing exponentially
- Hundreds of outdated image tags
- No easy way to identify which images were actually in use
- Fear of breaking production by deleting important images
Script
#!/usr/bin/env python3
"""
Harbor Image Cleanup Script
This script cleans up old/unused images from Harbor registry while ensuring images
used by Kubernetes deployments are preserved.
Features:
- Connects to Harbor API to list and delete images
- Checks Kubernetes deployments to identify images in use
- Applies configurable retention policies (by age or count)
- Provides detailed logging
- Can be run manually or as a cron job
Requirements:
- Python 3.6+
- Required packages: requests, kubernetes, pyyaml, argparse, logging
"""
import os
import sys
import time
import re
import json
import base64
import logging
import argparse
import datetime
import yaml
import requests
from requests.auth import HTTPBasicAuth
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from typing import Dict, List, Set, Tuple, Optional, Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('harbor-cleanup.log')
]
)
logger = logging.getLogger('harbor-cleanup')
class HarborClient:
"""Client for interacting with Harbor API"""
def __init__(self, harbor_url: str, username: str, password: str, verify_ssl: bool = True):
"""
Initialize Harbor client
Args:
harbor_url: URL of Harbor instance (e.g., https://harbor.example.com)
username: Harbor username
password: Harbor password
verify_ssl: Whether to verify SSL certificates
"""
self.harbor_url = harbor_url.rstrip('/')
self.api_url = f"{self.harbor_url}/api"
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.auth = HTTPBasicAuth(username, password)
self.session.verify = verify_ssl
# Add X-Xsrftoken header to all requests
self.session.headers.update({'X-Xsrftoken': 'token'})
# Test connection
try:
self.ping()
logger.info(f"Successfully connected to Harbor at {harbor_url}")
except Exception as e:
logger.error(f"Failed to connect to Harbor: {str(e)}")
raise
def ping(self) -> bool:
"""Test connection to Harbor API"""
response = self.session.get(f"{self.harbor_url}/api/ping")
response.raise_for_status()
return True
def get_projects(self) -> List[Dict]:
"""Get list of projects from Harbor"""
response = self.session.get(f"{self.harbor_url}/api/projects")
response.raise_for_status()
return response.json()
def get_project_id(self, project_name: str) -> int:
"""
Get project ID from project name
Args:
project_name: Name of the Harbor project
Returns:
Project ID as integer
Raises:
ValueError: If project not found
"""
projects = self.get_projects()
for project in projects:
if project.get('name') == project_name:
return project.get('project_id')
raise ValueError(f"Project {project_name} not found")
def get_repositories(self, project_name: str) -> List[Dict]:
"""
Get list of repositories in a project
Args:
project_name: Name of the Harbor project
Returns:
List of repository objects
"""
project_id = self.get_project_id(project_name)
response = self.session.get(f"{self.harbor_url}/api/repositories?project_id={project_id}")
response.raise_for_status()
return response.json()
def get_tags(self, repository_name: str) -> List[Dict]:
"""
Get list of tags in a repository
Args:
repository_name: Full name of repository (project/repo)
Returns:
List of tag objects
"""
# repository_name should be in format "project/repository"
response = self.session.get(f"{self.harbor_url}/api/repositories/{repository_name}/tags")
response.raise_for_status()
return response.json()
def delete_tag(self, repository_name: str, tag: str) -> bool:
"""
Delete a tag from Harbor
Args:
repository_name: Full name of repository (project/repo)
tag: Tag to delete
Returns:
True if deletion was successful
"""
response = self.session.delete(
f"{self.harbor_url}/api/repositories/{repository_name}/tags/{tag}"
)
response.raise_for_status()
logger.info(f"Deleted tag {repository_name}:{tag}")
return True
class KubernetesClient:
"""Client for interacting with Kubernetes API"""
def __init__(self, kubeconfig: Optional[str] = None, context: Optional[str] = None):
"""
Initialize Kubernetes client
Args:
kubeconfig: Path to kubeconfig file (defaults to ~/.kube/config)
context: Kubernetes context to use
"""
try:
if kubeconfig:
config.load_kube_config(kubeconfig, context=context)
else:
config.load_kube_config(context=context)
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
logger.info("Successfully connected to Kubernetes")
except Exception as e:
logger.error(f"Failed to connect to Kubernetes: {str(e)}")
raise
def get_images_in_use(self) -> Set[str]:
"""
Get set of all images currently in use in the cluster
Returns:
Set of image references (including tags/digests)
"""
images = set()
# Check deployments
try:
deployments = self.apps_v1.list_deployment_for_all_namespaces()
for deployment in deployments.items:
for container in deployment.spec.template.spec.containers:
images.add(container.image)
if deployment.spec.template.spec.init_containers:
for container in deployment.spec.template.spec.init_containers:
images.add(container.image)
except ApiException as e:
logger.error(f"Error getting deployments: {str(e)}")
# Check statefulsets
try:
statefulsets = self.apps_v1.list_stateful_set_for_all_namespaces()
for statefulset in statefulsets.items:
for container in statefulset.spec.template.spec.containers:
images.add(container.image)
if statefulset.spec.template.spec.init_containers:
for container in statefulset.spec.template.spec.init_containers:
images.add(container.image)
except ApiException as e:
logger.error(f"Error getting statefulsets: {str(e)}")
# Check daemonsets
try:
daemonsets = self.apps_v1.list_daemon_set_for_all_namespaces()
for daemonset in daemonsets.items:
for container in daemonset.spec.template.spec.containers:
images.add(container.image)
if daemonset.spec.template.spec.init_containers:
for container in daemonset.spec.template.spec.init_containers:
images.add(container.image)
except ApiException as e:
logger.error(f"Error getting daemonsets: {str(e)}")
# Check pods (for CronJobs and Jobs)
try:
pods = self.core_v1.list_pod_for_all_namespaces()
for pod in pods.items:
for container in pod.spec.containers:
images.add(container.image)
if pod.spec.init_containers:
for container in pod.spec.init_containers:
images.add(container.image)
except ApiException as e:
logger.error(f"Error getting pods: {str(e)}")
logger.info(f"Found {len(images)} unique images in use in Kubernetes")
return images
class HarborCleanup:
"""Main class for Harbor cleanup operations"""
def __init__(
self,
harbor_url: str,
harbor_username: str,
harbor_password: str,
kubeconfig: Optional[str] = None,
kube_context: Optional[str] = None,
dry_run: bool = False,
projects: Optional[List[str]] = None,
exclude_projects: Optional[List[str]] = None,
keep_days: Optional[int] = None,
keep_tags: Optional[int] = None,
skip_in_use: bool = True,
always_keep_tags: Optional[List[str]] = None,
verify_ssl: bool = True,
):
"""
Initialize Harbor cleanup
Args:
harbor_url: URL of Harbor instance
harbor_username: Harbor username
harbor_password: Harbor password
kubeconfig: Path to kubeconfig file
kube_context: Kubernetes context to use
dry_run: If True, don't actually delete anything
projects: List of projects to clean up (if None, clean all)
exclude_projects: List of projects to exclude from cleanup
keep_days: Keep images newer than this many days
keep_tags: Keep this many most recent tags per repository
skip_in_use: Skip images in use by Kubernetes
always_keep_tags: List of tag patterns to always keep (e.g., ['latest', 'stable', 'prod-*'])
verify_ssl: Whether to verify SSL certificates
"""
self.harbor_client = HarborClient(harbor_url, harbor_username, harbor_password, verify_ssl)
self.kube_client = KubernetesClient(kubeconfig, kube_context) if skip_in_use else None
self.dry_run = dry_run
self.projects_filter = projects
self.exclude_projects = exclude_projects or []
self.keep_days = keep_days
self.keep_tags = keep_tags
self.skip_in_use = skip_in_use
self.always_keep_tags = always_keep_tags or ["latest", "stable", "master"]
if not self.keep_days and not self.keep_tags:
logger.warning("No retention policy specified, defaulting to keeping 30 days")
self.keep_days = 30
if self.dry_run:
logger.info("DRY RUN MODE: No images will be deleted")
def should_keep_tag(self, tag_name: str) -> bool:
"""
Check if a tag should be kept based on tag patterns
Args:
tag_name: Name of the tag
Returns:
True if the tag should be kept
"""
for pattern in self.always_keep_tags:
if '*' in pattern:
# Convert glob pattern to regex
regex_pattern = pattern.replace('.', '\\.').replace('*', '.*')
if re.match(f"^{regex_pattern}$", tag_name):
return True
elif pattern == tag_name:
return True
return False
def run(self):
"""Run the cleanup process"""
# Get all images in use by Kubernetes
k8s_images = self.kube_client.get_images_in_use() if self.skip_in_use else set()
# Normalize Kubernetes image references to match Harbor format
normalized_k8s_images = self._normalize_k8s_images(k8s_images)
# Get projects to clean
all_projects = self.harbor_client.get_projects()
projects_to_clean = []
for project in all_projects:
project_name = project['name']
if self.projects_filter and project_name not in self.projects_filter:
logger.debug(f"Skipping project {project_name} (not in filter)")
continue
if project_name in self.exclude_projects:
logger.debug(f"Skipping project {project_name} (in exclude list)")
continue
projects_to_clean.append(project_name)
logger.info(f"Found {len(projects_to_clean)} projects to clean")
# Process each project
for project_name in projects_to_clean:
self._clean_project(project_name, normalized_k8s_images)
def _normalize_k8s_images(self, k8s_images: Set[str]) -> Dict[str, Set[str]]:
"""
Normalize Kubernetes image references to match Harbor format
Args:
k8s_images: Set of image references from Kubernetes
Returns:
Dict mapping repository names to sets of tags/digests
"""
normalized = {}
harbor_domain = self.harbor_client.harbor_url.replace('https://', '').replace('http://', '')
for image in k8s_images:
# Skip images not from our Harbor
if not image.startswith(harbor_domain):
continue
# Extract repository and tag/digest
if '@sha256:' in image:
repo, digest = image.split('@sha256:', 1)
digest = f"sha256:{digest}"
repo = repo.replace(f"{harbor_domain}/", '')
if repo not in normalized:
normalized[repo] = set()
normalized[repo].add(digest)
else:
if ':' in image[image.find('/')+1:]: # Make sure we're not splitting on the port
repo, tag = image.rsplit(':', 1)
repo = repo.replace(f"{harbor_domain}/", '')
if repo not in normalized:
normalized[repo] = set()
normalized[repo].add(tag)
else:
# Image without tag (implicitly 'latest')
repo = image.replace(f"{harbor_domain}/", '')
if repo not in normalized:
normalized[repo] = set()
normalized[repo].add('latest')
return normalized
def _clean_project(self, project_name: str, k8s_images: Dict[str, Set[str]]):
"""
Clean a single Harbor project
Args:
project_name: Name of the Harbor project
k8s_images: Dict mapping repository names to sets of tags/digests
"""
logger.info(f"Cleaning project: {project_name}")
# Get repositories in the project
try:
repositories = self.harbor_client.get_repositories(project_name)
except Exception as e:
logger.error(f"Error getting repositories for project {project_name}: {str(e)}")
return
logger.info(f"Found {len(repositories)} repositories in project {project_name}")
for repo in repositories:
repo_name = repo['name'] # Should be in format "project/repo"
self._clean_repository(repo_name, k8s_images)
def _clean_repository(self, repo_name: str, k8s_images: Dict[str, Set[str]]):
"""
Clean a single Harbor repository
Args:
repo_name: Full name of the repository (project/repo)
k8s_images: Dict mapping repository names to sets of tags/digests
"""
logger.info(f"Cleaning repository: {repo_name}")
# Get tags in the repository
try:
tags = self.harbor_client.get_tags(repo_name)
except Exception as e:
logger.error(f"Error getting tags for repository {repo_name}: {str(e)}")
return
# Sort tags by creation time (newest first)
tags.sort(key=lambda x: x.get('created', ''), reverse=True)
# Keep track of how many tags we've kept
kept_count = 0
for i, tag in enumerate(tags):
tag_name = tag['name']
created_time = tag.get('created')
# Check if tag is in use by Kubernetes
in_use = False
if self.skip_in_use and repo_name in k8s_images:
if tag_name in k8s_images[repo_name]:
in_use = True
logger.info(f"Keeping {repo_name}:{tag_name} (in use by Kubernetes)")
# Check if we should keep this tag based on retention policies
should_keep = False
# Check if tag matches the always-keep patterns
if self.should_keep_tag(tag_name):
should_keep = True
logger.info(f"Keeping {repo_name}:{tag_name} (matches always-keep pattern)")
# Check age-based retention
if not should_keep and self.keep_days is not None and created_time:
created_date = datetime.datetime.fromisoformat(created_time.replace('Z', '+00:00'))
age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days
if age_days < self.keep_days:
should_keep = True
logger.info(f"Keeping {repo_name}:{tag_name} (age: {age_days} days, keep_days: {self.keep_days})")
# Check count-based retention
if not should_keep and self.keep_tags is not None:
if kept_count < self.keep_tags:
should_keep = True
logger.info(f"Keeping {repo_name}:{tag_name} (keep_tags: {self.keep_tags})")
# Delete or keep tag
if in_use or should_keep:
kept_count += 1
else:
logger.info(f"Deleting {repo_name}:{tag_name}")
if not self.dry_run:
try:
self.harbor_client.delete_tag(repo_name, tag_name)
except Exception as e:
logger.error(f"Error deleting tag {repo_name}:{tag_name}: {str(e)}")
def get_harbor_credentials_from_kube() -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
Attempt to extract Harbor credentials from Kubernetes context
Returns:
Tuple of (harbor_url, username, password)
"""
# Try to load current context
try:
config.load_kube_config()
v1 = client.CoreV1Api()
# First, try to find harbor-auth secret in harbor namespace
try:
secret = v1.read_namespaced_secret("harbor-auth", "harbor")
harbor_url = base64.b64decode(secret.data.get("url", "")).decode("utf-8")
username = base64.b64decode(secret.data.get("username", "")).decode("utf-8")
password = base64.b64decode(secret.data.get("password", "")).decode("utf-8")
return harbor_url, username, password
except:
pass
# Try to find any Harbor credentials in any namespace
namespaces = v1.list_namespace()
for ns in namespaces.items:
ns_name = ns.metadata.name
try:
secrets = v1.list_namespaced_secret(ns_name)
for secret in secrets.items:
if "harbor" in secret.metadata.name.lower():
data = secret.data
if data:
# Try to find url, username, password
harbor_url = None
username = None
password = None
for key in data:
value = base64.b64decode(data[key]).decode("utf-8")
key_lower = key.lower()
if "url" in key_lower or "host" in key_lower:
harbor_url = value
elif "user" in key_lower:
username = value
elif "pass" in key_lower:
password = value
if harbor_url and username and password:
return harbor_url, username, password
except:
continue
except:
pass
return None, None, None
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Clean up old/unused images from Harbor registry"
)
# Harbor connection options
harbor_group = parser.add_argument_group("Harbor Connection Options")
harbor_group.add_argument(
"--harbor-url", help="Harbor URL (e.g., https://harbor.example.com)"
)
harbor_group.add_argument("--harbor-username", help="Harbor username")
harbor_group.add_argument("--harbor-password", help="Harbor password")
harbor_group.add_argument(
"--no-verify-ssl", action="store_true", help="Skip SSL certificate verification"
)
# Kubernetes options
k8s_group = parser.add_argument_group("Kubernetes Options")
k8s_group.add_argument("--kubeconfig", help="Path to kubeconfig file")
k8s_group.add_argument("--kube-context", help="Kubernetes context to use")
k8s_group.add_argument(
"--no-skip-in-use", action="store_true",
help="Don't skip images in use by Kubernetes"
)
# Cleanup options
cleanup_group = parser.add_argument_group("Cleanup Options")
cleanup_group.add_argument(
"--dry-run", action="store_true", help="Don't actually delete anything"
)
cleanup_group.add_argument(
"--projects", nargs="*", help="Projects to clean up (if not specified, clean all)"
)
cleanup_group.add_argument(
"--exclude-projects", nargs="*", help="Projects to exclude from cleanup"
)
cleanup_group.add_argument(
"--keep-days", type=int, help="Keep images newer than this many days"
)
cleanup_group.add_argument(
"--keep-tags", type=int, help="Keep this many most recent tags per repository"
)
cleanup_group.add_argument(
"--always-keep-tags", nargs="*",
default=["latest", "stable", "master"],
help="Tag patterns to always keep (supports wildcards, e.g., 'prod-*')"
)
# Other options
parser.add_argument(
"--config", help="Path to YAML config file (overrides command line options)"
)
parser.add_argument(
"--use-kube-auth", action="store_true",
help="Extract Harbor credentials from Kubernetes secrets"
)
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase verbosity"
)
args = parser.parse_args()
# Handle verbosity
if args.verbose == 1:
logger.setLevel(logging.INFO)
elif args.verbose >= 2:
logger.setLevel(logging.DEBUG)
# Load config file if specified
if args.config:
try:
with open(args.config, 'r') as f:
config_data = yaml.safe_load(f)
# Update args with config file values
for key, value in config_data.items():
if value is not None:
setattr(args, key.replace('-', '_'), value)
except Exception as e:
logger.error(f"Error loading config file: {str(e)}")
# Try to extract Harbor credentials from Kubernetes if requested
if args.use_kube_auth and not (args.harbor_url and args.harbor_username and args.harbor_password):
harbor_url, username, password = get_harbor_credentials_from_kube()
if harbor_url and username and password:
logger.info("Successfully extracted Harbor credentials from Kubernetes")
args.harbor_url = args.harbor_url or harbor_url
args.harbor_username = args.harbor_username or username
args.harbor_password = args.harbor_password or password
else:
logger.warning("Failed to extract Harbor credentials from Kubernetes")
# Validate required options
if not args.harbor_url:
parser.error("Harbor URL is required")
if not args.harbor_username:
parser.error("Harbor username is required")
if not args.harbor_password:
parser.error("Harbor password is required")
return args
def main():
"""Main entry point"""
args = parse_args()
cleanup = HarborCleanup(
harbor_url=args.harbor_url,
harbor_username=args.harbor_username,
harbor_password=args.harbor_password,
kubeconfig=args.kubeconfig,
kube_context=args.kube_context,
dry_run=args.dry_run,
projects=args.projects,
exclude_projects=args.exclude_projects,
keep_days=args.keep_days,
keep_tags=args.keep_tags,
skip_in_use=not args.no_skip_in_use,
always_keep_tags=args.always_keep_tags,
verify_ssl=not args.no_verify_ssl,
)
try:
cleanup.run()
logger.info("Cleanup completed successfully")
except KeyboardInterrupt:
logger.info("Cleanup interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
The Solution
A Python script that:
- Integrates with Harbor's API
- Checks Kubernetes for active deployments
- Applies configurable retention policies
- Provides safety mechanisms like dry-run mode
Key Features
# Sample configuration showing main features
cleanup = HarborCleanup(
harbor_url="https://harbor.example.com",
harbor_username="admin",
harbor_password="secret",
keep_days=30, # Keep images newer than 30 days
keep_tags=5, # Keep 5 most recent tags per repo
skip_in_use=True, # Don't delete images used by Kubernetes
always_keep_tags=[ # Protected tag patterns
"latest",
"stable",
"prod-*"
],
dry_run=True # Safety first!
)
Safety Mechanisms
- Dry Run Mode - Preview deletions without actually removing anything
./harbor-cleanup.py --dry-run --keep-days 30
- Kubernetes Integration - Automatically detects in-use images
# Gets images from:
# - Deployments
# - StatefulSets
# - DaemonSets
# - Pods (for Jobs/CronJobs)
k8s_images = kube_client.get_images_in_use()
Protected Tags - Never delete important tags like latest or prod-*
Dual Retention Policies - Combine age-based and count-based rules
Getting Started
- Install dependencies:
pip3 install pyyaml kubernetes requests
- Run a test dry-run:
./harbor-cleanup.py \
--use-kube-auth \
--dry-run \
--keep-days 30 \
--keep-tags 5 \
--always-keep-tags latest stable master "prod-*"
- For production run (after verifying dry-run):
./harbor-cleanup.py \
--harbor-url https://harbor.example.com \
--harbor-username $HARBOR_USER \
--harbor-password $HARBOR_PASS \
--keep-days 30 \
--keep-tags 5
Advanced Usage
Configuration File
Instead of command-line args, use a YAML config:
# config.yaml
harbor-url: https://harbor.example.com
harbor-username: admin
keep-days: 30
keep-tags: 5
exclude-projects:
- infrastructure
- legacy
Then run with:
./harbor-cleanup.py --config config.yaml
The script has helped us reduce Harbor storage usage by 70% while maintaining all production-critical images. The Kubernetes integration gives us confidence we won't break running applications.
Would love to hear about your registry cleanup experiences in the comments!
Top comments (0)