The Wake-Up Call
After a security audit, your organization received some sobering findings:
- Vulnerable dependencies in container images
- Hardcoded secrets in the codebase (API keys, passwords, tokens)
- Missing security scanning in CI/CD pipelines
- No automated compliance checks
- Slow security review process (3-5 days per release)
The challenge? Implement a comprehensive DevSecOps pipeline that:
- Maintains daily deployment frequency
- Doesn't add significant delays to the pipeline
- Ensures regulatory compliance (PCI-DSS, GDPR, SOC 2)
- Builds security into the culture, not just the tools
In this article, I'll walk through building a complete DevSecOps pipeline on AWS that addresses all these concerns while maintaining developer velocity.
DevSecOps Pipeline Architecture
The Shift-Left Philosophy
Shift-Left Security means catching security issues as early as possible in the development lifecycle:
Developer → Pre-Commit → CI Pipeline → Pre-Deployment → Production
↓ ↓ ↓ ↓ ↓
Local Scan Git Hooks SAST/DAST Container Scan Runtime Protection
The earlier we catch issues, the cheaper and faster they are to fix.
AWS DevSecOps Stack
Core AWS Services:
- AWS CodePipeline - CI/CD orchestration
- AWS CodeBuild - Build and security scanning
- AWS CodeCommit/GitHub - Source code management
- Amazon ECR - Container registry with scanning
- AWS Secrets Manager - Secret management
- AWS Security Hub - Centralized security findings
- Amazon Inspector - Vulnerability assessment
- AWS Config - Compliance monitoring
- AWS IAM - Access control and least privilege
Phase 1: Secret Management Solution
The Problem: Hardcoded Secrets
Hardcoded secrets are a critical security risk. They can be:
- Exposed in version control
- Leaked in logs or error messages
- Accessible to anyone with repository access
Solution: AWS Secrets Manager
Step 1: Migrate Existing Secrets
import boto3
import json
import re
secrets_client = boto3.client('secretsmanager')
def migrate_secrets_from_codebase():
"""Scan codebase and migrate hardcoded secrets to Secrets Manager"""
# Patterns to detect secrets
secret_patterns = {
'api_key': r'api[_-]?key["\']?\s*[:=]\s*["\']([^"\']+)["\']',
'password': r'password["\']?\s*[:=]\s*["\']([^"\']+)["\']',
'token': r'token["\']?\s*[:=]\s*["\']([^"\']+)["\']',
'secret': r'secret["\']?\s*[:=]\s*["\']([^"\']+)["\']'
}
# Scan files (example)
files_to_scan = ['config.py', 'settings.py', '.env.example']
for file_path in files_to_scan:
with open(file_path, 'r') as f:
content = f.read()
for secret_type, pattern in secret_patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
# Create secret in Secrets Manager
secret_name = f"payment-app/{secret_type}/{file_path}"
secrets_client.create_secret(
Name=secret_name,
SecretString=match,
Description=f"Migrated from {file_path}"
)
print(f"Created secret: {secret_name}")
# Run migration
migrate_secrets_from_codebase()
Step 2: Update Application Code
import boto3
import json
secrets_client = boto3.client('secretsmanager')
def get_secret(secret_name):
"""Retrieve secret from AWS Secrets Manager"""
try:
response = secrets_client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except Exception as e:
print(f"Error retrieving secret: {e}")
raise
# Usage in application
def connect_to_database():
"""Connect to database using secrets from Secrets Manager"""
db_secrets = get_secret('payment-app/database/credentials')
connection = psycopg2.connect(
host=db_secrets['host'],
database=db_secrets['database'],
user=db_secrets['username'],
password=db_secrets['password']
)
return connection
Step 3: Automatic Secret Rotation
import boto3
lambda_client = boto3.client('lambda')
def create_rotation_lambda():
"""Create Lambda function for automatic secret rotation"""
rotation_code = '''
import boto3
import json
import psycopg2
def lambda_handler(event, context):
"""Rotate database password"""
secrets_client = boto3.client('secretsmanager')
# Get current secret
secret_arn = event['SecretId']
current_secret = json.loads(
secrets_client.get_secret_value(SecretId=secret_arn)['SecretString']
)
# Generate new password
new_password = generate_secure_password()
# Update database
conn = psycopg2.connect(
host=current_secret['host'],
database=current_secret['database'],
user=current_secret['username'],
password=current_secret['password']
)
# Update password in database
# ... (implementation)
# Update secret
secrets_client.update_secret(
SecretId=secret_arn,
SecretString=json.dumps({
**current_secret,
'password': new_password
})
)
return {'statusCode': 200}
'''
# Create Lambda function
lambda_client.create_function(
FunctionName='rotate-db-secret',
Runtime='python3.9',
Role='arn:aws:iam::account:role/secret-rotation-role',
Handler='index.lambda_handler',
Code={'ZipFile': rotation_code.encode()}
)
# Enable automatic rotation
secrets_client = boto3.client('secretsmanager')
secrets_client.rotate_secret(
SecretId='payment-app/database/credentials',
RotationLambdaARN='arn:aws:lambda:region:account:function:rotate-db-secret',
RotationRules={
'AutomaticallyAfterDays': 30
}
)
Step 4: Pre-Commit Hook to Detect Secrets
#!/usr/bin/env python3
# .git/hooks/pre-commit
import subprocess
import re
import sys
def detect_secrets():
"""Detect potential secrets before commit"""
# Get staged files
result = subprocess.run(
['git', 'diff', '--cached', '--name-only'],
capture_output=True,
text=True
)
staged_files = result.stdout.strip().split('\n')
# Secret patterns
patterns = [
(r'password\s*[:=]\s*["\']([^"\']+)["\']', 'Password detected'),
(r'api[_-]?key\s*[:=]\s*["\']([^"\']+)["\']', 'API key detected'),
(r'secret\s*[:=]\s*["\']([^"\']+)["\']', 'Secret detected'),
(r'-----BEGIN (RSA |OPENSSH )?PRIVATE KEY-----', 'Private key detected'),
]
violations = []
for file_path in staged_files:
if not file_path:
continue
try:
with open(file_path, 'r') as f:
content = f.read()
for pattern, message in patterns:
if re.search(pattern, content, re.IGNORECASE):
violations.append(f"{file_path}: {message}")
except Exception:
continue
if violations:
print("❌ SECURITY VIOLATION: Potential secrets detected!")
for violation in violations:
print(f" - {violation}")
print("\nPlease use AWS Secrets Manager instead.")
print("See: https://docs.aws.amazon.com/secretsmanager/")
sys.exit(1)
print("✅ No secrets detected in staged files")
return 0
if __name__ == '__main__':
sys.exit(detect_secrets())
Make it executable:
chmod +x .git/hooks/pre-commit
Phase 2: Security Scanning Strategy
SAST (Static Application Security Testing)
AWS CodeBuild with SAST Tools
# buildspec-sast.yml
version: 0.2
phases:
pre_build:
commands:
- echo Installing SAST tools...
- |
# Install Semgrep for SAST
pip install semgrep
- |
# Install Bandit for Python security scanning
pip install bandit
build:
commands:
- echo Running SAST scans...
- |
# Semgrep scan
semgrep --config=auto \
--json \
--output=semgrep-results.json \
.
- |
# Bandit scan (for Python)
bandit -r . \
-f json \
-o bandit-results.json \
|| true
- |
# SonarQube scan (if using)
sonar-scanner \
-Dsonar.projectKey=payment-app \
-Dsonar.sources=. \
-Dsonar.host.url=$SONARQUBE_URL \
-Dsonar.login=$SONARQUBE_TOKEN
post_build:
commands:
- echo Uploading SAST results...
- |
# Upload to AWS Security Hub
aws securityhub batch-import-findings \
--findings file://convert-to-security-hub-format.json
- |
# Fail build if critical issues found
python check-sast-results.py
SAST Results Checker:
# check-sast-results.py
import json
import sys
def check_sast_results():
"""Check SAST results and fail build if critical issues found"""
with open('semgrep-results.json', 'r') as f:
semgrep_results = json.load(f)
critical_issues = []
for result in semgrep_results.get('results', []):
severity = result.get('extra', {}).get('severity', '')
if severity in ['ERROR', 'WARNING']:
critical_issues.append({
'file': result.get('path', ''),
'message': result.get('message', ''),
'severity': severity
})
if critical_issues:
print("Critical security issues found:")
for issue in critical_issues:
print(f" - {issue['file']}: {issue['message']} ({issue['severity']})")
sys.exit(1)
print("SAST scan passed")
return 0
if __name__ == '__main__':
sys.exit(check_sast_results())
DAST (Dynamic Application Security Testing)
AWS CodeBuild with OWASP ZAP:
# buildspec-dast.yml
version: 0.2
phases:
pre_build:
commands:
- echo Installing DAST tools...
- |
# Install OWASP ZAP
wget https://github.com/zaproxy/zaproxy/releases/download/v2.12.0/ZAP_2.12.0_Linux.tar.gz
tar -xzf ZAP_2.12.0_Linux.tar.gz
build:
commands:
- echo Starting application for DAST...
- |
# Start application (example)
docker-compose up -d payment-app
sleep 30 # Wait for app to be ready
- |
# Run OWASP ZAP baseline scan
./ZAP_2.12.0/zap-baseline.py \
-t http://localhost:8080 \
-J zap-report.json
- |
# Run OWASP ZAP full scan (more thorough)
./ZAP_2.12.0/zap-full-scan.py \
-t http://localhost:8080 \
-J zap-full-report.json \
-I # Ignore warnings
post_build:
commands:
- echo Processing DAST results...
- |
# Convert and upload to Security Hub
python process-dast-results.py zap-report.json
- |
# Fail if critical vulnerabilities found
python check-dast-results.py zap-report.json
DAST Results Processor:
# process-dast-results.py
import json
import sys
import boto3
securityhub = boto3.client('securityhub')
def process_dast_results(report_file):
"""Process DAST results and send to Security Hub"""
with open(report_file, 'r') as f:
zap_results = json.load(f)
findings = []
for site in zap_results.get('site', []):
for alert in site.get('alerts', []):
severity_map = {
'High': 'HIGH',
'Medium': 'MEDIUM',
'Low': 'LOW',
'Informational': 'INFORMATIONAL'
}
severity = severity_map.get(alert.get('risk', ''), 'INFORMATIONAL')
if severity in ['HIGH', 'MEDIUM']:
finding = {
'SchemaVersion': '2018-10-08',
'Id': f"zap-{alert.get('pluginid', 'unknown')}",
'ProductArn': 'arn:aws:securityhub:region:account:product/account/default',
'GeneratorId': 'owasp-zap',
'AwsAccountId': 'account-id',
'Types': ['Security Findings'],
'CreatedAt': '2024-01-01T00:00:00Z',
'UpdatedAt': '2024-01-01T00:00:00Z',
'Severity': {
'Label': severity
},
'Title': alert.get('name', 'Unknown vulnerability'),
'Description': alert.get('desc', ''),
'Remediation': {
'Recommendation': {
'Text': alert.get('solution', '')
}
}
}
findings.append(finding)
if findings:
# Batch import to Security Hub
securityhub.batch_import_findings(Findings=findings)
print(f"✅ Imported {len(findings)} findings to Security Hub")
return findings
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python process-dast-results.py <zap-report.json>")
sys.exit(1)
findings = process_dast_results(sys.argv[1])
sys.exit(0 if not findings else 1)
Container Image Scanning
Amazon ECR Image Scanning:
# Enable automatic image scanning on push
aws ecr put-image-scanning-configuration \
--repository-name payment-app \
--image-scanning-configuration scanOnPush=true
# Scan existing images
aws ecr start-image-scan \
--repository-name payment-app \
--image-id imageTag=latest
# Get scan results
aws ecr describe-image-scan-findings \
--repository-name payment-app \
--image-id imageTag=latest
CodeBuild Integration:
# buildspec-container-scan.yml
version: 0.2
phases:
pre_build:
commands:
- echo Logging in to Amazon ECR...
- aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com
- REPOSITORY_URI=$AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
- IMAGE_TAG=${COMMIT_HASH:=latest}
build:
commands:
- echo Building Docker image...
- docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG .
- docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $REPOSITORY_URI:$IMAGE_TAG
post_build:
commands:
- echo Pushing Docker image...
- docker push $REPOSITORY_URI:$IMAGE_TAG
- echo Waiting for image scan to complete...
- |
# Wait for scan to complete (ECR scans automatically on push)
aws ecr wait image-scan-complete \
--repository-name $IMAGE_REPO_NAME \
--image-id imageTag=$IMAGE_TAG \
--max-attempts 30 \
--delay 10
- echo Retrieving scan results...
- |
# Get scan findings
SCAN_RESULTS=$(aws ecr describe-image-scan-findings \
--repository-name $IMAGE_REPO_NAME \
--image-id imageTag=$IMAGE_TAG \
--query 'imageScanFindings' \
--output json)
- |
# Check for critical vulnerabilities
CRITICAL_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.CRITICAL // 0')
HIGH_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.HIGH // 0')
if [ "$CRITICAL_COUNT" -gt 0 ] || [ "$HIGH_COUNT" -gt 5 ]; then
echo "❌ Critical vulnerabilities found in image!"
echo "Critical: $CRITICAL_COUNT, High: $HIGH_COUNT"
exit 1
fi
echo "Image scan passed (Critical: $CRITICAL_COUNT, High: $HIGH_COUNT)"
- |
# Export scan results to Security Hub
python export-ecr-findings.py $IMAGE_REPO_NAME $IMAGE_TAG
ECR Findings Exporter:
# export-ecr-findings.py
import boto3
import json
import sys
def export_ecr_findings(repo_name, image_tag):
"""Export ECR scan findings to Security Hub"""
ecr = boto3.client('ecr')
securityhub = boto3.client('securityhub')
# Get scan findings
response = ecr.describe_image_scan_findings(
repositoryName=repo_name,
imageId={'imageTag': image_tag}
)
findings = response.get('imageScanFindings', {})
findings_list = findings.get('findings', [])
security_hub_findings = []
for finding in findings_list:
severity_map = {
'CRITICAL': 'CRITICAL',
'HIGH': 'HIGH',
'MEDIUM': 'MEDIUM',
'LOW': 'LOW',
'INFORMATIONAL': 'INFORMATIONAL'
}
severity = severity_map.get(finding.get('severity', ''), 'INFORMATIONAL')
# Only export HIGH and CRITICAL
if severity in ['CRITICAL', 'HIGH']:
security_hub_finding = {
'SchemaVersion': '2018-10-08',
'Id': f"ecr-{repo_name}-{image_tag}-{finding.get('name', 'unknown')}",
'ProductArn': f'arn:aws:securityhub:region:account:product/account/default',
'GeneratorId': 'amazon-ecr-image-scan',
'AwsAccountId': 'account-id',
'Types': ['Vulnerabilities'],
'CreatedAt': finding.get('firstObservedAt', ''),
'UpdatedAt': finding.get('lastObservedAt', ''),
'Severity': {
'Label': severity
},
'Title': finding.get('name', 'Unknown vulnerability'),
'Description': finding.get('description', ''),
'Remediation': {
'Recommendation': {
'Text': finding.get('remediation', {}).get('recommendation', {}).get('text', '')
}
},
'Resources': [{
'Type': 'AwsEcrContainerImage',
'Id': f"{repo_name}:{image_tag}",
'Region': 'us-east-1'
}]
}
security_hub_findings.append(security_hub_finding)
if security_hub_findings:
securityhub.batch_import_findings(Findings=security_hub_findings)
print(f"✅ Exported {len(security_hub_findings)} findings to Security Hub")
return security_hub_findings
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Usage: python export-ecr-findings.py <repo-name> <image-tag>")
sys.exit(1)
findings = export_ecr_findings(sys.argv[1], sys.argv[2])
sys.exit(0)
Dependency Scanning
Scanning Dependencies for Vulnerabilities:
# buildspec-dependency-scan.yml
version: 0.2
phases:
pre_build:
commands:
- echo Installing dependency scanning tools...
- |
# For Python
pip install safety
- |
# For Node.js
npm install -g npm-audit-resolver
- |
# For Java (Maven)
# Use OWASP Dependency-Check
wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.4.0/dependency-check-8.4.0-release.zip
unzip dependency-check-8.4.0-release.zip
build:
commands:
- echo Scanning dependencies...
- |
# Python safety check
safety check --json --output safety-report.json || true
- |
# Node.js npm audit
npm audit --json > npm-audit-report.json || true
- |
# Java/Maven dependency check
./dependency-check/bin/dependency-check.sh \
--project payment-app \
--scan . \
--format JSON \
--out dependency-check-report.json
post_build:
commands:
- echo Processing dependency scan results...
- |
# Process and upload to Security Hub
python process-dependency-results.py
- |
# Fail build if critical vulnerabilities found
python check-dependency-results.py
Phase 3: Complete DevSecOps Pipeline
AWS CodePipeline with Security Gates
{
"pipeline": {
"name": "payment-app-devsecops-pipeline",
"roleArn": "arn:aws:iam::account:role/CodePipelineServiceRole",
"artifactStore": {
"type": "S3",
"location": "payment-app-artifacts"
},
"stages": [
{
"name": "Source",
"actions": [{
"name": "SourceAction",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "CodeCommit",
"version": "1"
},
"outputArtifacts": [{"name": "SourceOutput"}],
"configuration": {
"RepositoryName": "payment-app",
"BranchName": "main"
}
}]
},
{
"name": "PreBuildSecurity",
"actions": [{
"name": "SecretScan",
"actionTypeId": {
"category": "Build",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "SourceOutput"}],
"outputArtifacts": [{"name": "SecretScanOutput"}],
"configuration": {
"ProjectName": "payment-app-secret-scan"
}
}]
},
{
"name": "Build",
"actions": [{
"name": "BuildAndSAST",
"actionTypeId": {
"category": "Build",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "SourceOutput"}],
"outputArtifacts": [{"name": "BuildOutput"}],
"configuration": {
"ProjectName": "payment-app-build-sast"
}
}]
},
{
"name": "SecurityGates",
"actions": [
{
"name": "DependencyScan",
"actionTypeId": {
"category": "Build",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "BuildOutput"}],
"outputArtifacts": [{"name": "DependencyScanOutput"}],
"configuration": {
"ProjectName": "payment-app-dependency-scan"
}
},
{
"name": "ContainerBuildAndScan",
"actionTypeId": {
"category": "Build",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "BuildOutput"}],
"outputArtifacts": [{"name": "ContainerOutput"}],
"configuration": {
"ProjectName": "payment-app-container-build-scan"
}
}
]
},
{
"name": "DAST",
"actions": [{
"name": "DynamicScan",
"actionTypeId": {
"category": "Test",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "ContainerOutput"}],
"outputArtifacts": [{"name": "DASTOutput"}],
"configuration": {
"ProjectName": "payment-app-dast"
}
}]
},
{
"name": "DeployToStaging",
"actions": [{
"name": "DeployStaging",
"actionTypeId": {
"category": "Deploy",
"owner": "AWS",
"provider": "ECS",
"version": "1"
},
"inputArtifacts": [{"name": "ContainerOutput"}],
"configuration": {
"ClusterName": "payment-staging",
"ServiceName": "payment-service"
}
}]
},
{
"name": "ComplianceCheck",
"actions": [{
"name": "ComplianceValidation",
"actionTypeId": {
"category": "Test",
"owner": "AWS",
"provider": "CodeBuild",
"version": "1"
},
"inputArtifacts": [{"name": "DASTOutput"}],
"configuration": {
"ProjectName": "payment-app-compliance-check"
}
}]
},
{
"name": "DeployToProduction",
"actions": [{
"name": "DeployProduction",
"actionTypeId": {
"category": "Deploy",
"owner": "AWS",
"provider": "ECS",
"version": "1"
},
"inputArtifacts": [{"name": "ContainerOutput"}],
"configuration": {
"ClusterName": "payment-production",
"ServiceName": "payment-service"
}
}]
}
]
}
}
Security Gates Implementation
Lambda Function for Security Gate Evaluation:
# security-gate-evaluator.py
import boto3
import json
securityhub = boto3.client('securityhub')
codebuild = boto3.client('codebuild')
def evaluate_security_gates(build_id):
"""Evaluate security gates before allowing deployment"""
# Get Security Hub findings for this build
findings = securityhub.get_findings(
Filters={
'ResourceId': [{'Value': build_id, 'Comparison': 'EQUALS'}],
'SeverityLabel': [
{'Value': 'CRITICAL', 'Comparison': 'EQUALS'},
{'Value': 'HIGH', 'Comparison': 'EQUALS'}
]
}
)
critical_findings = [
f for f in findings.get('Findings', [])
if f.get('Severity', {}).get('Label') == 'CRITICAL'
]
high_findings = [
f for f in findings.get('Findings', [])
if f.get('Severity', {}).get('Label') == 'HIGH'
]
# Gate rules
gate_passed = True
reasons = []
# Rule 1: No CRITICAL findings allowed
if critical_findings:
gate_passed = False
reasons.append(f"{len(critical_findings)} CRITICAL findings found")
# Rule 2: Maximum 3 HIGH findings allowed
if len(high_findings) > 3:
gate_passed = False
reasons.append(f"{len(high_findings)} HIGH findings found (max 3 allowed)")
# Rule 3: No hardcoded secrets
secret_findings = [
f for f in findings.get('Findings', [])
if 'secret' in f.get('Title', '').lower() or 'password' in f.get('Title', '').lower()
]
if secret_findings:
gate_passed = False
reasons.append(f"{len(secret_findings)} secret-related findings found")
result = {
'gate_passed': gate_passed,
'reasons': reasons,
'critical_count': len(critical_findings),
'high_count': len(high_findings)
}
return result
def lambda_handler(event, context):
"""Lambda handler for security gate evaluation"""
build_id = event.get('build_id')
result = evaluate_security_gates(build_id)
if not result['gate_passed']:
# Stop pipeline
raise Exception(f"Security gate failed: {', '.join(result['reasons'])}")
return {
'statusCode': 200,
'body': json.dumps(result)
}
Phase 4: Compliance Automation
AWS Config for Compliance Monitoring
Enable AWS Config:
# Enable Config
aws configservice put-configuration-recorder \
--configuration-recorder name=default,roleArn=arn:aws:iam::account:role/ConfigRole
# Start recording
aws configservice start-configuration-recorder \
--configuration-recorder-name default
PCI-DSS Compliance Rules:
{
"ConfigRuleName": "pci-dss-encryption-check",
"Description": "Check that EBS volumes are encrypted (PCI-DSS requirement)",
"Scope": {
"ComplianceResourceTypes": ["AWS::EC2::Volume"]
},
"Source": {
"Owner": "AWS",
"SourceIdentifier": "ENCRYPTED_VOLUMES"
}
}
Custom Compliance Rule:
# custom-compliance-rule.py
import boto3
import json
config = boto3.client('config')
def evaluate_compliance(configuration_item):
"""Evaluate if resource is compliant"""
compliance_status = 'COMPLIANT'
annotation = ''
# Example: Check if RDS instance has encryption enabled
if configuration_item['resourceType'] == 'AWS::RDS::DBInstance':
if not configuration_item.get('configuration', {}).get('storageEncrypted', False):
compliance_status = 'NON_COMPLIANT'
annotation = 'RDS instance must have encryption enabled for PCI-DSS compliance'
# Example: Check if security groups allow unrestricted access
if configuration_item['resourceType'] == 'AWS::EC2::SecurityGroup':
ip_permissions = configuration_item.get('configuration', {}).get('ipPermissions', [])
for perm in ip_permissions:
for ip_range in perm.get('ipRanges', []):
if ip_range.get('cidrIp') == '0.0.0.0/0':
compliance_status = 'NON_COMPLIANT'
annotation = 'Security group allows unrestricted access (0.0.0.0/0)'
return {
'compliance_type': compliance_status,
'annotation': annotation
}
def lambda_handler(event, context):
"""Lambda handler for Config custom rule"""
configuration_item = json.loads(event['invokingEvent'])['configurationItem']
evaluation = evaluate_compliance(configuration_item)
config.put_evaluations(
Evaluations=[{
'ComplianceResourceType': configuration_item['resourceType'],
'ComplianceResourceId': configuration_item['resourceId'],
'ComplianceType': evaluation['compliance_type'],
'Annotation': evaluation['annotation'],
'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
}]
)
return evaluation
Automated Compliance Reporting
# compliance-reporter.py
import boto3
from datetime import datetime
config = boto3.client('config')
s3 = boto3.client('s3')
def generate_compliance_report():
"""Generate compliance report for audit"""
# Get compliance summary
response = config.get_compliance_summary_by_config_rule()
report = {
'timestamp': datetime.utcnow().isoformat(),
'compliance_summary': response.get('ComplianceSummariesByConfigRule', []),
'overall_compliance': calculate_overall_compliance(response)
}
# Generate detailed findings
findings = []
for rule_summary in response.get('ComplianceSummariesByConfigRule', []):
if rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) > 0:
findings.append({
'rule_name': rule_summary.get('ConfigRuleName', ''),
'non_compliant_count': rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0)
})
report['findings'] = findings
# Save to S3 for audit trail
s3.put_object(
Bucket='compliance-reports',
Key=f"compliance-report-{datetime.utcnow().strftime('%Y-%m-%d')}.json",
Body=json.dumps(report, indent=2)
)
return report
def calculate_overall_compliance(response):
"""Calculate overall compliance percentage"""
total_resources = 0
compliant_resources = 0
for rule_summary in response.get('ComplianceSummariesByConfigRule', []):
summary = rule_summary.get('ComplianceSummary', {})
total_resources += summary.get('ComplianceResourceCount', {}).get('CappedCount', 0)
compliant_resources += summary.get('CompliantResourceCount', {}).get('CappedCount', 0)
if total_resources == 0:
return 100.0
return (compliant_resources / total_resources) * 100
# Schedule with EventBridge
import boto3
events = boto3.client('events')
events.put_rule(
Name='daily-compliance-report',
ScheduleExpression='cron(0 2 * * ? *)', # Daily at 2 AM
State='ENABLED'
)
events.put_targets(
Rule='daily-compliance-report',
Targets=[{
'Id': '1',
'Arn': 'arn:aws:lambda:region:account:function:compliance-reporter',
'Input': json.dumps({})
}]
)
Phase 5: Security Hub Integration
Centralized Security Findings
Enable AWS Security Hub:
# Enable Security Hub
aws securityhub enable-security-hub
# Enable security standards
aws securityhub batch-enable-standards \
--standards-subscription-requests \
StandardsArn=arn:aws:securityhub:region::standards/aws-foundational-security-best-practices/v/1.0.0 \
StandardsArn=arn:aws:securityhub:region::standards/pci-dss/v/3.2.1
Aggregate Findings from All Sources:
# security-findings-aggregator.py
import boto3
from datetime import datetime, timedelta
securityhub = boto3.client('securityhub')
def aggregate_security_findings():
"""Aggregate security findings from all sources"""
# Get findings from last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=1)
findings = securityhub.get_findings(
Filters={
'CreatedAt': [{
'Start': start_time.isoformat(),
'End': end_time.isoformat()
}],
'SeverityLabel': [
{'Value': 'CRITICAL', 'Comparison': 'EQUALS'},
{'Value': 'HIGH', 'Comparison': 'EQUALS'}
]
},
MaxResults=100
)
# Group by source
findings_by_source = {}
for finding in findings.get('Findings', []):
source = finding.get('ProductFields', {}).get('aws/securityhub/SourceIdentifier', 'unknown')
if source not in findings_by_source:
findings_by_source[source] = []
findings_by_source[source].append(finding)
# Generate summary
summary = {
'timestamp': datetime.utcnow().isoformat(),
'total_findings': len(findings.get('Findings', [])),
'findings_by_source': {
source: len(finds) for source, finds in findings_by_source.items()
},
'critical_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL']),
'high_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH'])
}
return summary
Phase 6: Pipeline Performance Optimization
Parallel Execution
Run Security Scans in Parallel:
# buildspec-parallel-security.yml
version: 0.2
phases:
build:
commands:
- echo Running security scans in parallel...
- |
# Run SAST, dependency scan, and secret scan in parallel
semgrep --config=auto . &
safety check &
git-secrets --scan &
wait # Wait for all to complete
- echo All security scans completed
Caching for Faster Builds
CodeBuild Cache Configuration:
{
"cache": {
"type": "LOCAL",
"modes": [
"LOCAL_DOCKER_LAYER_CACHE",
"LOCAL_SOURCE_CACHE"
]
}
}
Docker Layer Caching:
# Dockerfile with layer caching
FROM maven:3.8-openjdk-11 AS dependencies
COPY pom.xml .
RUN mvn dependency:go-offline
FROM dependencies AS build
COPY src ./src
RUN mvn package -DskipTests
FROM openjdk:11-jre-slim
COPY --from=build /app/target/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]
Fail-Fast Strategy
Early Failure Detection:
# fail-fast-security-checks.py
import sys
import subprocess
def run_fast_security_checks():
"""Run quick security checks that fail fast"""
checks = [
('Secret detection', 'git-secrets --scan'),
('Basic SAST', 'semgrep --config=auto --error'),
('Dependency vulnerabilities', 'safety check --short-report')
]
failed_checks = []
for check_name, command in checks:
print(f"Running {check_name}...")
result = subprocess.run(
command.split(),
capture_output=True,
timeout=60 # Fail fast with timeout
)
if result.returncode != 0:
failed_checks.append(check_name)
print(f"❌ {check_name} failed")
print(result.stdout.decode())
else:
print(f"✅ {check_name} passed")
if failed_checks:
print(f"\n❌ Failed checks: {', '.join(failed_checks)}")
sys.exit(1)
print("\n✅ All fast security checks passed")
return 0
if __name__ == '__main__':
sys.exit(run_fast_security_checks())
Phase 7: Team Training and Adoption
Developer Onboarding
Security Training Materials:
# Security Best Practices Guide
## Secret Management
- Never commit secrets to version control
- Use AWS Secrets Manager for all secrets
- Rotate secrets regularly
## Dependency Management
- Keep dependencies up to date
- Review security advisories
- Use dependency scanning tools
## Code Security
- Follow OWASP Top 10 guidelines
- Use SAST tools before committing
- Review security findings promptly
Automated Security Reminders
GitHub/GitLab Integration:
# security-reminder-bot.py
import boto3
import requests
def send_security_reminder(pr_number, findings_count):
"""Send reminder about security findings in PR"""
message = f"""
Security Review Required
PR #{pr_number} has {findings_count} security findings that need attention.
Please review and address:
- Critical findings must be fixed before merge
- High findings should be addressed or justified
- Medium/Low findings can be tracked as technical debt
View findings: https://security-hub.aws.amazon.com/findings
"""
# Send to Slack/Teams
webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
requests.post(webhook_url, json={'text': message})
Security Champions Program
Identify and Train Security Champions:
# security-champion-tracker.py
import boto3
def identify_security_champions():
"""Identify developers who actively address security issues"""
codecommit = boto3.client('codecommit')
# Analyze commit history for security-related commits
# Track who fixes security issues
# Recognize top contributors
champions = [
{
'name': 'Developer Name',
'security_fixes': 15,
'areas': ['SAST fixes', 'Dependency updates', 'Secret management']
}
]
return champions
Metrics and Success Criteria
Key Performance Indicators
# devsecops-metrics.py
import boto3
from datetime import datetime, timedelta
def calculate_devsecops_metrics():
"""Calculate DevSecOps pipeline metrics"""
codebuild = boto3.client('codebuild')
securityhub = boto3.client('securityhub')
# Get pipeline metrics
builds = codebuild.batch_get_builds(
ids=get_recent_build_ids()
)
metrics = {
'pipeline_duration': calculate_avg_pipeline_duration(builds),
'security_scan_time': calculate_avg_scan_time(builds),
'findings_detected': get_findings_count_last_30_days(),
'findings_fixed': get_findings_fixed_count(),
'deployment_frequency': get_deployment_frequency(),
'mean_time_to_remediate': calculate_mttr()
}
return metrics
def calculate_avg_pipeline_duration(builds):
"""Calculate average pipeline duration"""
durations = []
for build in builds.get('builds', []):
start = build.get('startTime')
end = build.get('endTime')
if start and end:
duration = (end - start).total_seconds()
durations.append(duration)
return sum(durations) / len(durations) if durations else 0
# Target metrics
target_metrics = {
'pipeline_duration_minutes': 15, # Target: < 15 minutes
'security_scan_time_minutes': 5, # Target: < 5 minutes
'findings_detected_per_week': 0, # Target: Reduce over time
'mean_time_to_remediate_hours': 24, # Target: < 24 hours
'deployment_frequency_per_day': 1 # Target: Maintain daily deployments
}
Best Practices Summary
Do's ✅
- Shift-Left: Run security checks as early as possible
- Automate Everything: Don't rely on manual security reviews
- Fail Fast: Catch issues early, fail builds quickly
- Centralize Findings: Use Security Hub for unified view
- Educate Team: Security is everyone's responsibility
- Measure Everything: Track metrics to improve
Don'ts ❌
- Don't Block Developers: Balance security with velocity
- Don't Ignore False Positives: Tune rules to reduce noise
- Don't Skip Compliance: Automate compliance checks
- Don't Forget Runtime: Security doesn't end at deployment
- Don't Set It and Forget It: Continuously improve the pipeline
Conclusion
Building a DevSecOps pipeline on AWS requires integrating security at every stage of the development lifecycle. Key takeaways:
- AWS Secrets Manager eliminates hardcoded secrets
- Multi-layered scanning (SAST, DAST, container, dependency) catches issues early
- Security Hub provides centralized visibility
- Automated compliance ensures regulatory requirements are met
- Security gates prevent vulnerable code from reaching production
- Performance optimization maintains daily deployment frequency
The result? A secure, compliant, and fast development pipeline that builds security into your culture, not just your tools.
Top comments (0)