DEV Community

Maureen Chebet
Maureen Chebet

Posted on

Building a DevSecOps Pipeline on AWS: From Security Audit to Daily Deployments

The Wake-Up Call

After a security audit, your organization received some sobering findings:

  • Vulnerable dependencies in container images
  • Hardcoded secrets in the codebase (API keys, passwords, tokens)
  • Missing security scanning in CI/CD pipelines
  • No automated compliance checks
  • Slow security review process (3-5 days per release)

The challenge? Implement a comprehensive DevSecOps pipeline that:

  • Maintains daily deployment frequency
  • Doesn't add significant delays to the pipeline
  • Ensures regulatory compliance (PCI-DSS, GDPR, SOC 2)
  • Builds security into the culture, not just the tools

In this article, I'll walk through building a complete DevSecOps pipeline on AWS that addresses all these concerns while maintaining developer velocity.

DevSecOps Pipeline Architecture

The Shift-Left Philosophy

Shift-Left Security means catching security issues as early as possible in the development lifecycle:

Developer → Pre-Commit → CI Pipeline → Pre-Deployment → Production
   ↓            ↓            ↓              ↓              ↓
Local Scan   Git Hooks   SAST/DAST    Container Scan  Runtime Protection
Enter fullscreen mode Exit fullscreen mode

The earlier we catch issues, the cheaper and faster they are to fix.

AWS DevSecOps Stack

Core AWS Services:

  • AWS CodePipeline - CI/CD orchestration
  • AWS CodeBuild - Build and security scanning
  • AWS CodeCommit/GitHub - Source code management
  • Amazon ECR - Container registry with scanning
  • AWS Secrets Manager - Secret management
  • AWS Security Hub - Centralized security findings
  • Amazon Inspector - Vulnerability assessment
  • AWS Config - Compliance monitoring
  • AWS IAM - Access control and least privilege

Phase 1: Secret Management Solution

The Problem: Hardcoded Secrets

Hardcoded secrets are a critical security risk. They can be:

  • Exposed in version control
  • Leaked in logs or error messages
  • Accessible to anyone with repository access

Solution: AWS Secrets Manager

Step 1: Migrate Existing Secrets

import boto3
import json
import re

secrets_client = boto3.client('secretsmanager')

def migrate_secrets_from_codebase():
    """Scan codebase and migrate hardcoded secrets to Secrets Manager"""

    # Patterns to detect secrets
    secret_patterns = {
        'api_key': r'api[_-]?key["\']?\s*[:=]\s*["\']([^"\']+)["\']',
        'password': r'password["\']?\s*[:=]\s*["\']([^"\']+)["\']',
        'token': r'token["\']?\s*[:=]\s*["\']([^"\']+)["\']',
        'secret': r'secret["\']?\s*[:=]\s*["\']([^"\']+)["\']'
    }

    # Scan files (example)
    files_to_scan = ['config.py', 'settings.py', '.env.example']

    for file_path in files_to_scan:
        with open(file_path, 'r') as f:
            content = f.read()

            for secret_type, pattern in secret_patterns.items():
                matches = re.findall(pattern, content, re.IGNORECASE)
                for match in matches:
                    # Create secret in Secrets Manager
                    secret_name = f"payment-app/{secret_type}/{file_path}"
                    secrets_client.create_secret(
                        Name=secret_name,
                        SecretString=match,
                        Description=f"Migrated from {file_path}"
                    )
                    print(f"Created secret: {secret_name}")

# Run migration
migrate_secrets_from_codebase()
Enter fullscreen mode Exit fullscreen mode

Step 2: Update Application Code

import boto3
import json

secrets_client = boto3.client('secretsmanager')

def get_secret(secret_name):
    """Retrieve secret from AWS Secrets Manager"""
    try:
        response = secrets_client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except Exception as e:
        print(f"Error retrieving secret: {e}")
        raise

# Usage in application
def connect_to_database():
    """Connect to database using secrets from Secrets Manager"""
    db_secrets = get_secret('payment-app/database/credentials')

    connection = psycopg2.connect(
        host=db_secrets['host'],
        database=db_secrets['database'],
        user=db_secrets['username'],
        password=db_secrets['password']
    )
    return connection
Enter fullscreen mode Exit fullscreen mode

Step 3: Automatic Secret Rotation

import boto3

lambda_client = boto3.client('lambda')

def create_rotation_lambda():
    """Create Lambda function for automatic secret rotation"""

    rotation_code = '''
import boto3
import json
import psycopg2

def lambda_handler(event, context):
    """Rotate database password"""
    secrets_client = boto3.client('secretsmanager')

    # Get current secret
    secret_arn = event['SecretId']
    current_secret = json.loads(
        secrets_client.get_secret_value(SecretId=secret_arn)['SecretString']
    )

    # Generate new password
    new_password = generate_secure_password()

    # Update database
    conn = psycopg2.connect(
        host=current_secret['host'],
        database=current_secret['database'],
        user=current_secret['username'],
        password=current_secret['password']
    )
    # Update password in database
    # ... (implementation)

    # Update secret
    secrets_client.update_secret(
        SecretId=secret_arn,
        SecretString=json.dumps({
            **current_secret,
            'password': new_password
        })
    )

    return {'statusCode': 200}
'''

    # Create Lambda function
    lambda_client.create_function(
        FunctionName='rotate-db-secret',
        Runtime='python3.9',
        Role='arn:aws:iam::account:role/secret-rotation-role',
        Handler='index.lambda_handler',
        Code={'ZipFile': rotation_code.encode()}
    )

# Enable automatic rotation
secrets_client = boto3.client('secretsmanager')
secrets_client.rotate_secret(
    SecretId='payment-app/database/credentials',
    RotationLambdaARN='arn:aws:lambda:region:account:function:rotate-db-secret',
    RotationRules={
        'AutomaticallyAfterDays': 30
    }
)
Enter fullscreen mode Exit fullscreen mode

Step 4: Pre-Commit Hook to Detect Secrets

#!/usr/bin/env python3
# .git/hooks/pre-commit

import subprocess
import re
import sys

def detect_secrets():
    """Detect potential secrets before commit"""

    # Get staged files
    result = subprocess.run(
        ['git', 'diff', '--cached', '--name-only'],
        capture_output=True,
        text=True
    )
    staged_files = result.stdout.strip().split('\n')

    # Secret patterns
    patterns = [
        (r'password\s*[:=]\s*["\']([^"\']+)["\']', 'Password detected'),
        (r'api[_-]?key\s*[:=]\s*["\']([^"\']+)["\']', 'API key detected'),
        (r'secret\s*[:=]\s*["\']([^"\']+)["\']', 'Secret detected'),
        (r'-----BEGIN (RSA |OPENSSH )?PRIVATE KEY-----', 'Private key detected'),
    ]

    violations = []

    for file_path in staged_files:
        if not file_path:
            continue

        try:
            with open(file_path, 'r') as f:
                content = f.read()
                for pattern, message in patterns:
                    if re.search(pattern, content, re.IGNORECASE):
                        violations.append(f"{file_path}: {message}")
        except Exception:
            continue

    if violations:
        print("❌ SECURITY VIOLATION: Potential secrets detected!")
        for violation in violations:
            print(f"  - {violation}")
        print("\nPlease use AWS Secrets Manager instead.")
        print("See: https://docs.aws.amazon.com/secretsmanager/")
        sys.exit(1)

    print("✅ No secrets detected in staged files")
    return 0

if __name__ == '__main__':
    sys.exit(detect_secrets())
Enter fullscreen mode Exit fullscreen mode

Make it executable:

chmod +x .git/hooks/pre-commit
Enter fullscreen mode Exit fullscreen mode

Phase 2: Security Scanning Strategy

SAST (Static Application Security Testing)

AWS CodeBuild with SAST Tools

# buildspec-sast.yml
version: 0.2
phases:
  pre_build:
    commands:
      - echo Installing SAST tools...
      - |
        # Install Semgrep for SAST
        pip install semgrep
      - |
        # Install Bandit for Python security scanning
        pip install bandit
  build:
    commands:
      - echo Running SAST scans...
      - |
        # Semgrep scan
        semgrep --config=auto \
          --json \
          --output=semgrep-results.json \
          .
      - |
        # Bandit scan (for Python)
        bandit -r . \
          -f json \
          -o bandit-results.json \
          || true
      - |
        # SonarQube scan (if using)
        sonar-scanner \
          -Dsonar.projectKey=payment-app \
          -Dsonar.sources=. \
          -Dsonar.host.url=$SONARQUBE_URL \
          -Dsonar.login=$SONARQUBE_TOKEN
  post_build:
    commands:
      - echo Uploading SAST results...
      - |
        # Upload to AWS Security Hub
        aws securityhub batch-import-findings \
          --findings file://convert-to-security-hub-format.json
      - |
        # Fail build if critical issues found
        python check-sast-results.py
Enter fullscreen mode Exit fullscreen mode

SAST Results Checker:

# check-sast-results.py
import json
import sys

def check_sast_results():
    """Check SAST results and fail build if critical issues found"""

    with open('semgrep-results.json', 'r') as f:
        semgrep_results = json.load(f)

    critical_issues = []

    for result in semgrep_results.get('results', []):
        severity = result.get('extra', {}).get('severity', '')
        if severity in ['ERROR', 'WARNING']:
            critical_issues.append({
                'file': result.get('path', ''),
                'message': result.get('message', ''),
                'severity': severity
            })

    if critical_issues:
        print("Critical security issues found:")
        for issue in critical_issues:
            print(f"  - {issue['file']}: {issue['message']} ({issue['severity']})")
        sys.exit(1)

    print("SAST scan passed")
    return 0

if __name__ == '__main__':
    sys.exit(check_sast_results())
Enter fullscreen mode Exit fullscreen mode

DAST (Dynamic Application Security Testing)

AWS CodeBuild with OWASP ZAP:

# buildspec-dast.yml
version: 0.2
phases:
  pre_build:
    commands:
      - echo Installing DAST tools...
      - |
        # Install OWASP ZAP
        wget https://github.com/zaproxy/zaproxy/releases/download/v2.12.0/ZAP_2.12.0_Linux.tar.gz
        tar -xzf ZAP_2.12.0_Linux.tar.gz
  build:
    commands:
      - echo Starting application for DAST...
      - |
        # Start application (example)
        docker-compose up -d payment-app
        sleep 30  # Wait for app to be ready
      - |
        # Run OWASP ZAP baseline scan
        ./ZAP_2.12.0/zap-baseline.py \
          -t http://localhost:8080 \
          -J zap-report.json
      - |
        # Run OWASP ZAP full scan (more thorough)
        ./ZAP_2.12.0/zap-full-scan.py \
          -t http://localhost:8080 \
          -J zap-full-report.json \
          -I  # Ignore warnings
  post_build:
    commands:
      - echo Processing DAST results...
      - |
        # Convert and upload to Security Hub
        python process-dast-results.py zap-report.json
      - |
        # Fail if critical vulnerabilities found
        python check-dast-results.py zap-report.json
Enter fullscreen mode Exit fullscreen mode

DAST Results Processor:

# process-dast-results.py
import json
import sys
import boto3

securityhub = boto3.client('securityhub')

def process_dast_results(report_file):
    """Process DAST results and send to Security Hub"""

    with open(report_file, 'r') as f:
        zap_results = json.load(f)

    findings = []

    for site in zap_results.get('site', []):
        for alert in site.get('alerts', []):
            severity_map = {
                'High': 'HIGH',
                'Medium': 'MEDIUM',
                'Low': 'LOW',
                'Informational': 'INFORMATIONAL'
            }

            severity = severity_map.get(alert.get('risk', ''), 'INFORMATIONAL')

            if severity in ['HIGH', 'MEDIUM']:
                finding = {
                    'SchemaVersion': '2018-10-08',
                    'Id': f"zap-{alert.get('pluginid', 'unknown')}",
                    'ProductArn': 'arn:aws:securityhub:region:account:product/account/default',
                    'GeneratorId': 'owasp-zap',
                    'AwsAccountId': 'account-id',
                    'Types': ['Security Findings'],
                    'CreatedAt': '2024-01-01T00:00:00Z',
                    'UpdatedAt': '2024-01-01T00:00:00Z',
                    'Severity': {
                        'Label': severity
                    },
                    'Title': alert.get('name', 'Unknown vulnerability'),
                    'Description': alert.get('desc', ''),
                    'Remediation': {
                        'Recommendation': {
                            'Text': alert.get('solution', '')
                        }
                    }
                }
                findings.append(finding)

    if findings:
        # Batch import to Security Hub
        securityhub.batch_import_findings(Findings=findings)
        print(f"✅ Imported {len(findings)} findings to Security Hub")

    return findings

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: python process-dast-results.py <zap-report.json>")
        sys.exit(1)

    findings = process_dast_results(sys.argv[1])
    sys.exit(0 if not findings else 1)
Enter fullscreen mode Exit fullscreen mode

Container Image Scanning

Amazon ECR Image Scanning:

# Enable automatic image scanning on push
aws ecr put-image-scanning-configuration \
  --repository-name payment-app \
  --image-scanning-configuration scanOnPush=true

# Scan existing images
aws ecr start-image-scan \
  --repository-name payment-app \
  --image-id imageTag=latest

# Get scan results
aws ecr describe-image-scan-findings \
  --repository-name payment-app \
  --image-id imageTag=latest
Enter fullscreen mode Exit fullscreen mode

CodeBuild Integration:

# buildspec-container-scan.yml
version: 0.2
phases:
  pre_build:
    commands:
      - echo Logging in to Amazon ECR...
      - aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com
      - REPOSITORY_URI=$AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME
      - COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
      - IMAGE_TAG=${COMMIT_HASH:=latest}
  build:
    commands:
      - echo Building Docker image...
      - docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG .
      - docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $REPOSITORY_URI:$IMAGE_TAG
  post_build:
    commands:
      - echo Pushing Docker image...
      - docker push $REPOSITORY_URI:$IMAGE_TAG
      - echo Waiting for image scan to complete...
      - |
        # Wait for scan to complete (ECR scans automatically on push)
        aws ecr wait image-scan-complete \
          --repository-name $IMAGE_REPO_NAME \
          --image-id imageTag=$IMAGE_TAG \
          --max-attempts 30 \
          --delay 10
      - echo Retrieving scan results...
      - |
        # Get scan findings
        SCAN_RESULTS=$(aws ecr describe-image-scan-findings \
          --repository-name $IMAGE_REPO_NAME \
          --image-id imageTag=$IMAGE_TAG \
          --query 'imageScanFindings' \
          --output json)
      - |
        # Check for critical vulnerabilities
        CRITICAL_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.CRITICAL // 0')
        HIGH_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.HIGH // 0')

        if [ "$CRITICAL_COUNT" -gt 0 ] || [ "$HIGH_COUNT" -gt 5 ]; then
          echo "❌ Critical vulnerabilities found in image!"
          echo "Critical: $CRITICAL_COUNT, High: $HIGH_COUNT"
          exit 1
        fi

        echo "Image scan passed (Critical: $CRITICAL_COUNT, High: $HIGH_COUNT)"
      - |
        # Export scan results to Security Hub
        python export-ecr-findings.py $IMAGE_REPO_NAME $IMAGE_TAG
Enter fullscreen mode Exit fullscreen mode

ECR Findings Exporter:

# export-ecr-findings.py
import boto3
import json
import sys

def export_ecr_findings(repo_name, image_tag):
    """Export ECR scan findings to Security Hub"""

    ecr = boto3.client('ecr')
    securityhub = boto3.client('securityhub')

    # Get scan findings
    response = ecr.describe_image_scan_findings(
        repositoryName=repo_name,
        imageId={'imageTag': image_tag}
    )

    findings = response.get('imageScanFindings', {})
    findings_list = findings.get('findings', [])

    security_hub_findings = []

    for finding in findings_list:
        severity_map = {
            'CRITICAL': 'CRITICAL',
            'HIGH': 'HIGH',
            'MEDIUM': 'MEDIUM',
            'LOW': 'LOW',
            'INFORMATIONAL': 'INFORMATIONAL'
        }

        severity = severity_map.get(finding.get('severity', ''), 'INFORMATIONAL')

        # Only export HIGH and CRITICAL
        if severity in ['CRITICAL', 'HIGH']:
            security_hub_finding = {
                'SchemaVersion': '2018-10-08',
                'Id': f"ecr-{repo_name}-{image_tag}-{finding.get('name', 'unknown')}",
                'ProductArn': f'arn:aws:securityhub:region:account:product/account/default',
                'GeneratorId': 'amazon-ecr-image-scan',
                'AwsAccountId': 'account-id',
                'Types': ['Vulnerabilities'],
                'CreatedAt': finding.get('firstObservedAt', ''),
                'UpdatedAt': finding.get('lastObservedAt', ''),
                'Severity': {
                    'Label': severity
                },
                'Title': finding.get('name', 'Unknown vulnerability'),
                'Description': finding.get('description', ''),
                'Remediation': {
                    'Recommendation': {
                        'Text': finding.get('remediation', {}).get('recommendation', {}).get('text', '')
                    }
                },
                'Resources': [{
                    'Type': 'AwsEcrContainerImage',
                    'Id': f"{repo_name}:{image_tag}",
                    'Region': 'us-east-1'
                }]
            }
            security_hub_findings.append(security_hub_finding)

    if security_hub_findings:
        securityhub.batch_import_findings(Findings=security_hub_findings)
        print(f"✅ Exported {len(security_hub_findings)} findings to Security Hub")

    return security_hub_findings

if __name__ == '__main__':
    if len(sys.argv) < 3:
        print("Usage: python export-ecr-findings.py <repo-name> <image-tag>")
        sys.exit(1)

    findings = export_ecr_findings(sys.argv[1], sys.argv[2])
    sys.exit(0)
Enter fullscreen mode Exit fullscreen mode

Dependency Scanning

Scanning Dependencies for Vulnerabilities:

# buildspec-dependency-scan.yml
version: 0.2
phases:
  pre_build:
    commands:
      - echo Installing dependency scanning tools...
      - |
        # For Python
        pip install safety
      - |
        # For Node.js
        npm install -g npm-audit-resolver
      - |
        # For Java (Maven)
        # Use OWASP Dependency-Check
        wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.4.0/dependency-check-8.4.0-release.zip
        unzip dependency-check-8.4.0-release.zip
  build:
    commands:
      - echo Scanning dependencies...
      - |
        # Python safety check
        safety check --json --output safety-report.json || true
      - |
        # Node.js npm audit
        npm audit --json > npm-audit-report.json || true
      - |
        # Java/Maven dependency check
        ./dependency-check/bin/dependency-check.sh \
          --project payment-app \
          --scan . \
          --format JSON \
          --out dependency-check-report.json
  post_build:
    commands:
      - echo Processing dependency scan results...
      - |
        # Process and upload to Security Hub
        python process-dependency-results.py
      - |
        # Fail build if critical vulnerabilities found
        python check-dependency-results.py
Enter fullscreen mode Exit fullscreen mode

Phase 3: Complete DevSecOps Pipeline

AWS CodePipeline with Security Gates

{
  "pipeline": {
    "name": "payment-app-devsecops-pipeline",
    "roleArn": "arn:aws:iam::account:role/CodePipelineServiceRole",
    "artifactStore": {
      "type": "S3",
      "location": "payment-app-artifacts"
    },
    "stages": [
      {
        "name": "Source",
        "actions": [{
          "name": "SourceAction",
          "actionTypeId": {
            "category": "Source",
            "owner": "AWS",
            "provider": "CodeCommit",
            "version": "1"
          },
          "outputArtifacts": [{"name": "SourceOutput"}],
          "configuration": {
            "RepositoryName": "payment-app",
            "BranchName": "main"
          }
        }]
      },
      {
        "name": "PreBuildSecurity",
        "actions": [{
          "name": "SecretScan",
          "actionTypeId": {
            "category": "Build",
            "owner": "AWS",
            "provider": "CodeBuild",
            "version": "1"
          },
          "inputArtifacts": [{"name": "SourceOutput"}],
          "outputArtifacts": [{"name": "SecretScanOutput"}],
          "configuration": {
            "ProjectName": "payment-app-secret-scan"
          }
        }]
      },
      {
        "name": "Build",
        "actions": [{
          "name": "BuildAndSAST",
          "actionTypeId": {
            "category": "Build",
            "owner": "AWS",
            "provider": "CodeBuild",
            "version": "1"
          },
          "inputArtifacts": [{"name": "SourceOutput"}],
          "outputArtifacts": [{"name": "BuildOutput"}],
          "configuration": {
            "ProjectName": "payment-app-build-sast"
          }
        }]
      },
      {
        "name": "SecurityGates",
        "actions": [
          {
            "name": "DependencyScan",
            "actionTypeId": {
              "category": "Build",
              "owner": "AWS",
              "provider": "CodeBuild",
              "version": "1"
            },
            "inputArtifacts": [{"name": "BuildOutput"}],
            "outputArtifacts": [{"name": "DependencyScanOutput"}],
            "configuration": {
              "ProjectName": "payment-app-dependency-scan"
            }
          },
          {
            "name": "ContainerBuildAndScan",
            "actionTypeId": {
              "category": "Build",
              "owner": "AWS",
              "provider": "CodeBuild",
              "version": "1"
            },
            "inputArtifacts": [{"name": "BuildOutput"}],
            "outputArtifacts": [{"name": "ContainerOutput"}],
            "configuration": {
              "ProjectName": "payment-app-container-build-scan"
            }
          }
        ]
      },
      {
        "name": "DAST",
        "actions": [{
          "name": "DynamicScan",
          "actionTypeId": {
            "category": "Test",
            "owner": "AWS",
            "provider": "CodeBuild",
            "version": "1"
          },
          "inputArtifacts": [{"name": "ContainerOutput"}],
          "outputArtifacts": [{"name": "DASTOutput"}],
          "configuration": {
            "ProjectName": "payment-app-dast"
          }
        }]
      },
      {
        "name": "DeployToStaging",
        "actions": [{
          "name": "DeployStaging",
          "actionTypeId": {
            "category": "Deploy",
            "owner": "AWS",
            "provider": "ECS",
            "version": "1"
          },
          "inputArtifacts": [{"name": "ContainerOutput"}],
          "configuration": {
            "ClusterName": "payment-staging",
            "ServiceName": "payment-service"
          }
        }]
      },
      {
        "name": "ComplianceCheck",
        "actions": [{
          "name": "ComplianceValidation",
          "actionTypeId": {
            "category": "Test",
            "owner": "AWS",
            "provider": "CodeBuild",
            "version": "1"
          },
          "inputArtifacts": [{"name": "DASTOutput"}],
          "configuration": {
            "ProjectName": "payment-app-compliance-check"
          }
        }]
      },
      {
        "name": "DeployToProduction",
        "actions": [{
          "name": "DeployProduction",
          "actionTypeId": {
            "category": "Deploy",
            "owner": "AWS",
            "provider": "ECS",
            "version": "1"
          },
          "inputArtifacts": [{"name": "ContainerOutput"}],
          "configuration": {
            "ClusterName": "payment-production",
            "ServiceName": "payment-service"
          }
        }]
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Security Gates Implementation

Lambda Function for Security Gate Evaluation:

# security-gate-evaluator.py
import boto3
import json

securityhub = boto3.client('securityhub')
codebuild = boto3.client('codebuild')

def evaluate_security_gates(build_id):
    """Evaluate security gates before allowing deployment"""

    # Get Security Hub findings for this build
    findings = securityhub.get_findings(
        Filters={
            'ResourceId': [{'Value': build_id, 'Comparison': 'EQUALS'}],
            'SeverityLabel': [
                {'Value': 'CRITICAL', 'Comparison': 'EQUALS'},
                {'Value': 'HIGH', 'Comparison': 'EQUALS'}
            ]
        }
    )

    critical_findings = [
        f for f in findings.get('Findings', [])
        if f.get('Severity', {}).get('Label') == 'CRITICAL'
    ]

    high_findings = [
        f for f in findings.get('Findings', [])
        if f.get('Severity', {}).get('Label') == 'HIGH'
    ]

    # Gate rules
    gate_passed = True
    reasons = []

    # Rule 1: No CRITICAL findings allowed
    if critical_findings:
        gate_passed = False
        reasons.append(f"{len(critical_findings)} CRITICAL findings found")

    # Rule 2: Maximum 3 HIGH findings allowed
    if len(high_findings) > 3:
        gate_passed = False
        reasons.append(f"{len(high_findings)} HIGH findings found (max 3 allowed)")

    # Rule 3: No hardcoded secrets
    secret_findings = [
        f for f in findings.get('Findings', [])
        if 'secret' in f.get('Title', '').lower() or 'password' in f.get('Title', '').lower()
    ]
    if secret_findings:
        gate_passed = False
        reasons.append(f"{len(secret_findings)} secret-related findings found")

    result = {
        'gate_passed': gate_passed,
        'reasons': reasons,
        'critical_count': len(critical_findings),
        'high_count': len(high_findings)
    }

    return result

def lambda_handler(event, context):
    """Lambda handler for security gate evaluation"""

    build_id = event.get('build_id')
    result = evaluate_security_gates(build_id)

    if not result['gate_passed']:
        # Stop pipeline
        raise Exception(f"Security gate failed: {', '.join(result['reasons'])}")

    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }
Enter fullscreen mode Exit fullscreen mode

Phase 4: Compliance Automation

AWS Config for Compliance Monitoring

Enable AWS Config:

# Enable Config
aws configservice put-configuration-recorder \
  --configuration-recorder name=default,roleArn=arn:aws:iam::account:role/ConfigRole

# Start recording
aws configservice start-configuration-recorder \
  --configuration-recorder-name default
Enter fullscreen mode Exit fullscreen mode

PCI-DSS Compliance Rules:

{
  "ConfigRuleName": "pci-dss-encryption-check",
  "Description": "Check that EBS volumes are encrypted (PCI-DSS requirement)",
  "Scope": {
    "ComplianceResourceTypes": ["AWS::EC2::Volume"]
  },
  "Source": {
    "Owner": "AWS",
    "SourceIdentifier": "ENCRYPTED_VOLUMES"
  }
}
Enter fullscreen mode Exit fullscreen mode

Custom Compliance Rule:

# custom-compliance-rule.py
import boto3
import json

config = boto3.client('config')

def evaluate_compliance(configuration_item):
    """Evaluate if resource is compliant"""

    compliance_status = 'COMPLIANT'
    annotation = ''

    # Example: Check if RDS instance has encryption enabled
    if configuration_item['resourceType'] == 'AWS::RDS::DBInstance':
        if not configuration_item.get('configuration', {}).get('storageEncrypted', False):
            compliance_status = 'NON_COMPLIANT'
            annotation = 'RDS instance must have encryption enabled for PCI-DSS compliance'

    # Example: Check if security groups allow unrestricted access
    if configuration_item['resourceType'] == 'AWS::EC2::SecurityGroup':
        ip_permissions = configuration_item.get('configuration', {}).get('ipPermissions', [])
        for perm in ip_permissions:
            for ip_range in perm.get('ipRanges', []):
                if ip_range.get('cidrIp') == '0.0.0.0/0':
                    compliance_status = 'NON_COMPLIANT'
                    annotation = 'Security group allows unrestricted access (0.0.0.0/0)'

    return {
        'compliance_type': compliance_status,
        'annotation': annotation
    }

def lambda_handler(event, context):
    """Lambda handler for Config custom rule"""

    configuration_item = json.loads(event['invokingEvent'])['configurationItem']
    evaluation = evaluate_compliance(configuration_item)

    config.put_evaluations(
        Evaluations=[{
            'ComplianceResourceType': configuration_item['resourceType'],
            'ComplianceResourceId': configuration_item['resourceId'],
            'ComplianceType': evaluation['compliance_type'],
            'Annotation': evaluation['annotation'],
            'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
        }]
    )

    return evaluation
Enter fullscreen mode Exit fullscreen mode

Automated Compliance Reporting

# compliance-reporter.py
import boto3
from datetime import datetime

config = boto3.client('config')
s3 = boto3.client('s3')

def generate_compliance_report():
    """Generate compliance report for audit"""

    # Get compliance summary
    response = config.get_compliance_summary_by_config_rule()

    report = {
        'timestamp': datetime.utcnow().isoformat(),
        'compliance_summary': response.get('ComplianceSummariesByConfigRule', []),
        'overall_compliance': calculate_overall_compliance(response)
    }

    # Generate detailed findings
    findings = []
    for rule_summary in response.get('ComplianceSummariesByConfigRule', []):
        if rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) > 0:
            findings.append({
                'rule_name': rule_summary.get('ConfigRuleName', ''),
                'non_compliant_count': rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0)
            })

    report['findings'] = findings

    # Save to S3 for audit trail
    s3.put_object(
        Bucket='compliance-reports',
        Key=f"compliance-report-{datetime.utcnow().strftime('%Y-%m-%d')}.json",
        Body=json.dumps(report, indent=2)
    )

    return report

def calculate_overall_compliance(response):
    """Calculate overall compliance percentage"""

    total_resources = 0
    compliant_resources = 0

    for rule_summary in response.get('ComplianceSummariesByConfigRule', []):
        summary = rule_summary.get('ComplianceSummary', {})
        total_resources += summary.get('ComplianceResourceCount', {}).get('CappedCount', 0)
        compliant_resources += summary.get('CompliantResourceCount', {}).get('CappedCount', 0)

    if total_resources == 0:
        return 100.0

    return (compliant_resources / total_resources) * 100

# Schedule with EventBridge
import boto3

events = boto3.client('events')

events.put_rule(
    Name='daily-compliance-report',
    ScheduleExpression='cron(0 2 * * ? *)',  # Daily at 2 AM
    State='ENABLED'
)

events.put_targets(
    Rule='daily-compliance-report',
    Targets=[{
        'Id': '1',
        'Arn': 'arn:aws:lambda:region:account:function:compliance-reporter',
        'Input': json.dumps({})
    }]
)
Enter fullscreen mode Exit fullscreen mode

Phase 5: Security Hub Integration

Centralized Security Findings

Enable AWS Security Hub:

# Enable Security Hub
aws securityhub enable-security-hub

# Enable security standards
aws securityhub batch-enable-standards \
  --standards-subscription-requests \
    StandardsArn=arn:aws:securityhub:region::standards/aws-foundational-security-best-practices/v/1.0.0 \
    StandardsArn=arn:aws:securityhub:region::standards/pci-dss/v/3.2.1
Enter fullscreen mode Exit fullscreen mode

Aggregate Findings from All Sources:

# security-findings-aggregator.py
import boto3
from datetime import datetime, timedelta

securityhub = boto3.client('securityhub')

def aggregate_security_findings():
    """Aggregate security findings from all sources"""

    # Get findings from last 24 hours
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=1)

    findings = securityhub.get_findings(
        Filters={
            'CreatedAt': [{
                'Start': start_time.isoformat(),
                'End': end_time.isoformat()
            }],
            'SeverityLabel': [
                {'Value': 'CRITICAL', 'Comparison': 'EQUALS'},
                {'Value': 'HIGH', 'Comparison': 'EQUALS'}
            ]
        },
        MaxResults=100
    )

    # Group by source
    findings_by_source = {}
    for finding in findings.get('Findings', []):
        source = finding.get('ProductFields', {}).get('aws/securityhub/SourceIdentifier', 'unknown')
        if source not in findings_by_source:
            findings_by_source[source] = []
        findings_by_source[source].append(finding)

    # Generate summary
    summary = {
        'timestamp': datetime.utcnow().isoformat(),
        'total_findings': len(findings.get('Findings', [])),
        'findings_by_source': {
            source: len(finds) for source, finds in findings_by_source.items()
        },
        'critical_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL']),
        'high_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH'])
    }

    return summary
Enter fullscreen mode Exit fullscreen mode

Phase 6: Pipeline Performance Optimization

Parallel Execution

Run Security Scans in Parallel:

# buildspec-parallel-security.yml
version: 0.2
phases:
  build:
    commands:
      - echo Running security scans in parallel...
      - |
        # Run SAST, dependency scan, and secret scan in parallel
        semgrep --config=auto . &
        safety check &
        git-secrets --scan &
        wait  # Wait for all to complete
      - echo All security scans completed
Enter fullscreen mode Exit fullscreen mode

Caching for Faster Builds

CodeBuild Cache Configuration:

{
  "cache": {
    "type": "LOCAL",
    "modes": [
      "LOCAL_DOCKER_LAYER_CACHE",
      "LOCAL_SOURCE_CACHE"
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Docker Layer Caching:

# Dockerfile with layer caching
FROM maven:3.8-openjdk-11 AS dependencies
COPY pom.xml .
RUN mvn dependency:go-offline

FROM dependencies AS build
COPY src ./src
RUN mvn package -DskipTests

FROM openjdk:11-jre-slim
COPY --from=build /app/target/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]
Enter fullscreen mode Exit fullscreen mode

Fail-Fast Strategy

Early Failure Detection:

# fail-fast-security-checks.py
import sys
import subprocess

def run_fast_security_checks():
    """Run quick security checks that fail fast"""

    checks = [
        ('Secret detection', 'git-secrets --scan'),
        ('Basic SAST', 'semgrep --config=auto --error'),
        ('Dependency vulnerabilities', 'safety check --short-report')
    ]

    failed_checks = []

    for check_name, command in checks:
        print(f"Running {check_name}...")
        result = subprocess.run(
            command.split(),
            capture_output=True,
            timeout=60  # Fail fast with timeout
        )

        if result.returncode != 0:
            failed_checks.append(check_name)
            print(f"{check_name} failed")
            print(result.stdout.decode())
        else:
            print(f"{check_name} passed")

    if failed_checks:
        print(f"\n❌ Failed checks: {', '.join(failed_checks)}")
        sys.exit(1)

    print("\n✅ All fast security checks passed")
    return 0

if __name__ == '__main__':
    sys.exit(run_fast_security_checks())
Enter fullscreen mode Exit fullscreen mode

Phase 7: Team Training and Adoption

Developer Onboarding

Security Training Materials:

# Security Best Practices Guide

## Secret Management
- Never commit secrets to version control
- Use AWS Secrets Manager for all secrets
- Rotate secrets regularly

## Dependency Management
- Keep dependencies up to date
- Review security advisories
- Use dependency scanning tools

## Code Security
- Follow OWASP Top 10 guidelines
- Use SAST tools before committing
- Review security findings promptly
Enter fullscreen mode Exit fullscreen mode

Automated Security Reminders

GitHub/GitLab Integration:

# security-reminder-bot.py
import boto3
import requests

def send_security_reminder(pr_number, findings_count):
    """Send reminder about security findings in PR"""

    message = f"""
 Security Review Required

PR #{pr_number} has {findings_count} security findings that need attention.

Please review and address:
- Critical findings must be fixed before merge
- High findings should be addressed or justified
- Medium/Low findings can be tracked as technical debt

View findings: https://security-hub.aws.amazon.com/findings
"""

    # Send to Slack/Teams
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    requests.post(webhook_url, json={'text': message})
Enter fullscreen mode Exit fullscreen mode

Security Champions Program

Identify and Train Security Champions:

# security-champion-tracker.py
import boto3

def identify_security_champions():
    """Identify developers who actively address security issues"""

    codecommit = boto3.client('codecommit')

    # Analyze commit history for security-related commits
    # Track who fixes security issues
    # Recognize top contributors

    champions = [
        {
            'name': 'Developer Name',
            'security_fixes': 15,
            'areas': ['SAST fixes', 'Dependency updates', 'Secret management']
        }
    ]

    return champions
Enter fullscreen mode Exit fullscreen mode

Metrics and Success Criteria

Key Performance Indicators

# devsecops-metrics.py
import boto3
from datetime import datetime, timedelta

def calculate_devsecops_metrics():
    """Calculate DevSecOps pipeline metrics"""

    codebuild = boto3.client('codebuild')
    securityhub = boto3.client('securityhub')

    # Get pipeline metrics
    builds = codebuild.batch_get_builds(
        ids=get_recent_build_ids()
    )

    metrics = {
        'pipeline_duration': calculate_avg_pipeline_duration(builds),
        'security_scan_time': calculate_avg_scan_time(builds),
        'findings_detected': get_findings_count_last_30_days(),
        'findings_fixed': get_findings_fixed_count(),
        'deployment_frequency': get_deployment_frequency(),
        'mean_time_to_remediate': calculate_mttr()
    }

    return metrics

def calculate_avg_pipeline_duration(builds):
    """Calculate average pipeline duration"""
    durations = []
    for build in builds.get('builds', []):
        start = build.get('startTime')
        end = build.get('endTime')
        if start and end:
            duration = (end - start).total_seconds()
            durations.append(duration)

    return sum(durations) / len(durations) if durations else 0

# Target metrics
target_metrics = {
    'pipeline_duration_minutes': 15,  # Target: < 15 minutes
    'security_scan_time_minutes': 5,  # Target: < 5 minutes
    'findings_detected_per_week': 0,  # Target: Reduce over time
    'mean_time_to_remediate_hours': 24,  # Target: < 24 hours
    'deployment_frequency_per_day': 1  # Target: Maintain daily deployments
}
Enter fullscreen mode Exit fullscreen mode

Best Practices Summary

Do's ✅

  1. Shift-Left: Run security checks as early as possible
  2. Automate Everything: Don't rely on manual security reviews
  3. Fail Fast: Catch issues early, fail builds quickly
  4. Centralize Findings: Use Security Hub for unified view
  5. Educate Team: Security is everyone's responsibility
  6. Measure Everything: Track metrics to improve

Don'ts ❌

  1. Don't Block Developers: Balance security with velocity
  2. Don't Ignore False Positives: Tune rules to reduce noise
  3. Don't Skip Compliance: Automate compliance checks
  4. Don't Forget Runtime: Security doesn't end at deployment
  5. Don't Set It and Forget It: Continuously improve the pipeline

Conclusion

Building a DevSecOps pipeline on AWS requires integrating security at every stage of the development lifecycle. Key takeaways:

  1. AWS Secrets Manager eliminates hardcoded secrets
  2. Multi-layered scanning (SAST, DAST, container, dependency) catches issues early
  3. Security Hub provides centralized visibility
  4. Automated compliance ensures regulatory requirements are met
  5. Security gates prevent vulnerable code from reaching production
  6. Performance optimization maintains daily deployment frequency

The result? A secure, compliant, and fast development pipeline that builds security into your culture, not just your tools.

Additional Resources

Top comments (0)