In this post, I'll automate the process of adding malicious IP addresses classified by the following third-party service https://proxycheck.io to AWS WAF.
AWS has already built-in managed lists you can leverage to protect your resources against malicious IP addresses used for Recon, DoS, or with a general bad reputation.
- AWSManagedIPReputationList
- AWSManagedReconnaissanceList
- AWSManagedIPDDoSList
But we're building our own list based on web server access logs, specifically Cloudfront.
Prerequisites
- An API key obtained from https://proxycheck.io/ ( they have a generous free plan )
- You need the necessary IAM permissions to deploy an AWS SAM application and create the needed underlying resources.
- An Elasticache Valkey cluster for caching our API call results from ProxyCheck for quota efficiency.
- A telegram bot created so you can receive notifications about WAF updates
- A SAM app initialized with AWS SAM CLI
Walkthrough
We will create a new AWS SAM template file and include the following block of YAML definition to define:
- A Lambda function running our Python script that interacts with ProxyCheck API, stores results in Elasticache Valkey along with the risk score associated with the scanned IP address, and updates the AWS WAF IP set accordingly.
- An EventBridge Rule that will act as an event listener to trigger the lambda function whenever a new CSV file is written to our S3 bucket that contains our exported list of IP addresses.
- The necessary IAM permissions attached to the lambda function to read files from S3 and update the WAF IP Set.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
ProxyCheck WAF automation
ProxyCheck WAF automation Lambda Function that reads IPs from S3, checks risk scores using ProxyCheck.io,
and updates AWS WAF IP sets with high-risk IPs.
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 900 # 15 minutes
MemorySize: 512
LoggingConfig:
LogFormat: JSON
Resources:
ProxyCheckWAFAutomation:
Type: AWS::Serverless::Function
Properties:
FunctionName: "ProxyCheckWAFAutomation"
CodeUri: proxy_check_waf_automation/
Handler: app.lambda_handler
Runtime: python3.13
Architectures:
- x86_64
Timeout: 900 # 15 minutes
MemorySize: 512
Environment:
Variables:
S3_BUCKET_NAME: "proxycheck-waf-automation"
PROXYCHECK_API_KEY: "xxxx-xxxxx-xxxxx-xxxxx"
TELEGRAM_BOT_TOKEN: "YOUR_TG_BOT_TOKEN"
TELEGRAM_CHAT_ID: "-xxxxx"
REDIS_URL: "redis://username:password@hostname/db_name"
Events:
Trigger:
Type: EventBridgeRule
Properties:
Pattern:
source:
- "aws.s3"
detail-type:
- "Object Created"
- "Object Updated"
detail:
bucket:
name:
- "proxycheck-waf-automation"
Policies:
- S3ReadPolicy:
BucketName: "proxycheck-waf-automation/*"
- Statement:
- Effect: Allow
Action:
- wafv2:GetIPSet
- wafv2:UpdateIPSet
Resource: "*"
You need to update the environment variables values to match your S3 bucket name, proxycheck.io API key and telegram bot token and chat ID. Please use AWS Systems Manager Parameter store to store your secrets. The above template is for demo purposes only.
Below is the code needed in your Python script:
import json
import csv
import os
import io
import logging
from typing import List, Set, Optional
import boto3
import requests
import proxycheck
import botocore.exceptions
import sys
import redis
import time
# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
s3_client = boto3.client('s3')
wafv2_client = boto3.client('wafv2', region_name='us-east-1')
# Initialize ProxyCheck client
proxy_checker = proxycheck.Blocking(key=os.environ.get('PROXYCHECK_API_KEY'))
# Initialize Redis client
REDIS_URL = os.environ.get('REDIS_URL')
redis_client = None
def get_redis_client():
"""Get Redis client with connection retry logic"""
global redis_client
if redis_client is None:
try:
redis_client = redis.from_url(
REDIS_URL,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# Test connection
redis_client.ping()
logger.info("Redis connection established successfully")
except Exception as e:
logger.warning(f"Failed to connect to Redis: {str(e)}. Will proceed without caching.")
redis_client = None
return redis_client
def lambda_handler(event, context):
"""Daily IP Checker Lambda Function
This function:
1. Reads a CSV file from S3 containing IPs
2. Deduplicates the IPs
3. Checks risk scores using ProxyCheck.io (limited to 1000 requests)
4. Adds high-risk IPs (score > 33) to AWS WAF IP set
5. Sends Telegram notification about the update
"""
try:
# Get configuration from environment variables
s3_bucket = os.environ.get('S3_BUCKET_NAME')
s3_key = event["detail"]["object"]["key"]
waf_scope = 'CLOUDFRONT'
waf_ip_set_name = "ProxyCheckAutomation"
waf_ip_set_id = "your_id_here"
telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN')
telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID')
# Validate required environment variables
required_vars = [s3_bucket, s3_key, waf_ip_set_name, waf_ip_set_id]
if not all(required_vars):
raise ValueError("Missing required environment variables")
logger.info(f"Starting daily IP check process")
# Step 1: Read CSV file from S3
logger.info(f"Reading CSV file from S3: s3://{s3_bucket}/{s3_key}")
ips = read_ips_from_s3(s3_bucket, s3_key)
logger.info(f"Found {len(ips)} unique IPs after deduplication")
# Step 2: Check IP risk scores (limited to 1000 requests)
logger.info("Checking IP risk scores with ProxyCheck.io and Redis caching")
risky_ips = check_ip_risk_scores(ips[:1000])
logger.info(f"Found {len(risky_ips)} risky IPs (risk score > 33)")
# Step 3: Add risky IPs to AWS WAF IP set
added_count = 0
if risky_ips:
logger.info("Adding risky IPs to AWS WAF IP set")
# Convert IPs to CIDR format
networks = {f"{ip}/32" for ip in risky_ips}
added_count = update_single_ipset(wafv2_client, waf_ip_set_name, waf_scope, waf_ip_set_id, networks)
logger.info(f"Added {added_count} IPs to WAF IP set")
else:
logger.info("No risky IPs found, no updates needed")
# Step 4: Send Telegram notification with cache stats
if telegram_bot_token and telegram_chat_id:
# Get cache efficiency stats for notification
cache_stats = get_cache_stats()
send_telegram_notification(
telegram_bot_token,
telegram_chat_id,
len(ips),
len(risky_ips),
added_count,
context,
cache_stats
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Daily IP check completed successfully",
"total_ips_processed": len(ips),
"risky_ips_found": len(risky_ips),
"ips_added_to_waf": added_count if risky_ips else 0
})
}
except Exception as e:
logger.error(f"Error in daily IP check: {str(e)}")
# Send error notification via Telegram if configured
telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN')
telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID')
if telegram_bot_token and telegram_chat_id:
send_error_notification(telegram_bot_token, telegram_chat_id, str(e), context)
raise e
def read_ips_from_s3(bucket: str, key: str) -> List[str]:
"""Read and deduplicate IPs from CSV file in S3"""
try:
# Download the CSV file from S3
response = s3_client.get_object(Bucket=bucket, Key=key)
csv_content = response['Body'].read().decode('utf-8')
# Parse CSV and extract IPs
ips: Set[str] = set()
csv_reader = csv.reader(io.StringIO(csv_content))
for row in csv_reader:
# Skip empty rows
if not row or len(row) < 2:
continue
# IP is in the second column (index 1)
ip_value = row[1].strip()
if ip_value and is_valid_ip(ip_value):
ips.add(ip_value)
return list(ips)
except Exception as e:
logger.error(f"Error reading IPs from S3: {str(e)}")
raise
def is_valid_ip(ip: str) -> bool:
"""Basic IP validation"""
try:
parts = ip.split('.')
return len(parts) == 4 and all(0 <= int(part) <= 255 for part in parts)
except (ValueError, AttributeError):
return False
def get_cached_risk_score(ip: str) -> Optional[int]:
"""Get cached risk score from Redis"""
try:
redis_conn = get_redis_client()
if redis_conn is None:
return None
cache_key = f"ip_risk:{ip}"
cached_score = redis_conn.get(cache_key)
if cached_score is not None:
return int(cached_score.decode('utf-8'))
return None
except Exception as e:
logger.warning(f"Error reading from Redis cache for IP {ip}: {str(e)}")
return None
def cache_risk_score(ip: str, risk_score: int, ttl_hours: int = 720):
"""Cache risk score in Redis with TTL"""
try:
redis_conn = get_redis_client()
if redis_conn is None:
return
cache_key = f"ip_risk:{ip}"
redis_conn.setex(cache_key, ttl_hours * 3600, str(risk_score))
logger.debug(f"Cached risk score for IP {ip}: {risk_score}")
except Exception as e:
logger.warning(f"Error caching risk score for IP {ip}: {str(e)}")
# Global cache stats for tracking
cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0}
def get_cache_stats():
"""Get current cache statistics"""
return cache_stats.copy()
def reset_cache_stats():
"""Reset cache statistics"""
global cache_stats
cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0}
def check_ip_risk_scores(ips: List[str]) -> List[str]:
"""Check IP risk scores using ProxyCheck.io with Redis caching and return risky IPs"""
risky_ips = []
reset_cache_stats() # Reset stats at the beginning of each run
try:
logger.info(f"Starting to check {len(ips)} IPs with Redis caching")
for ip in ips:
try:
# First, check if we have a cached result
cached_risk = get_cached_risk_score(ip)
if cached_risk is not None:
risk_score = cached_risk
cache_stats["cache_hits"] += 1
logger.debug(f"Cache hit for IP {ip}: risk score {risk_score}")
else:
# Check individual IP using proxy_checker.ip() method
ip_result = proxy_checker.ip(ip)
risk_score = ip_result.risk()
cache_stats["api_calls"] += 1
# Cache the result for future use
cache_risk_score(ip, risk_score)
logger.debug(f"API call for IP {ip}: risk score {risk_score}")
cache_stats["checked_count"] += 1
if risk_score > 33:
risky_ips.append(ip)
logger.info(f"Risky IP found: {ip} (risk score: {risk_score})")
# Log progress every 100 IPs
if cache_stats["checked_count"] % 100 == 0:
logger.info(f"Progress: {cache_stats['checked_count']}/{len(ips)} IPs processed, {len(risky_ips)} risky IPs found. Cache hits: {cache_stats['cache_hits']}, API calls: {cache_stats['api_calls']}")
except Exception as e:
logger.warning(f"Error checking IP {ip}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error checking IP risk scores: {str(e)}")
raise
cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100) if cache_stats["checked_count"] > 0 else 0
logger.info(f"Completed checking {cache_stats['checked_count']} IPs, found {len(risky_ips)} risky IPs. Cache efficiency: {cache_stats['cache_hits']}/{cache_stats['checked_count']} ({cache_efficiency:.1f}% cache hits), API calls saved: {cache_stats['cache_hits']}")
return risky_ips
def get_ipset_and_token(client, name, scope, ipset_id):
"""
Retrieve an existing WAFv2 IPSet.
Returns (lock_token, description, existing_addresses_set).
"""
try:
resp = client.get_ip_set(Name=name, Scope=scope, Id=ipset_id)
except botocore.exceptions.ClientError as e:
logger.error(f"Error fetching IPSet {name}: {e}")
raise
ipset = resp["IPSet"]
lock_token = resp["LockToken"]
description = ipset.get("Description", "")
existing = set(ipset.get("Addresses", []))
return lock_token, description, existing
def update_single_ipset(client, name, scope, ipset_id, networks, is_ipv6=False):
"""
Add any networks in `networks` to the specified IPSet.
`networks` is a set of CIDR network strings (e.g., '192.168.1.1/32').
Returns the number of new IPs added.
"""
try:
# 1) Fetch existing
lock_token, description, existing_addrs = get_ipset_and_token(
client, name, scope, ipset_id
)
# 2) Compute new
to_add = networks - existing_addrs
if not to_add:
logger.info(f"[{name}] no new {'IPv6' if is_ipv6 else 'IPv4'} CIDRs to add.")
return 0
# 3) Merge and sort
updated = sorted(existing_addrs | networks)
limit = 10000
if len(updated) > limit:
logger.error(f"[{name}] total addresses {len(updated)} exceed limit {limit}.")
raise ValueError(f"IPSet would exceed limit of {limit} addresses")
# 4) Update IPSet
try:
resp = client.update_ip_set(
Name=name,
Scope=scope,
Id=ipset_id,
Description=description or 'Daily IP Risk Checker - High Risk IPs',
Addresses=list(updated),
LockToken=lock_token
)
except botocore.exceptions.ClientError as e:
logger.error(f"Error updating IPSet {name}: {e}")
raise
next_token = resp.get("NextLockToken")
logger.info(
f"[{name}] Inserted {len(to_add)} new items; now has {len(updated)} addresses. "
f"NextLockToken: {next_token}"
)
return len(to_add)
except Exception as e:
logger.error(f"Error updating IPSet {name}: {str(e)}")
raise
def send_telegram_notification(bot_token: str, chat_id: str, total_ips: int, risky_ips: int, added_ips: int, context=None, cache_stats=None):
"""Send Telegram notification about the IP check results"""
try:
request_id = context.aws_request_id if context else 'Unknown'
# Build cache efficiency message
cache_message = ""
if cache_stats and cache_stats.get("checked_count", 0) > 0:
cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100)
cache_message = f"""
💾 Cache Performance:
• Cache hits: {cache_stats["cache_hits"]}
• API calls: {cache_stats["api_calls"]}
• Efficiency: {cache_efficiency:.1f}%
• API calls saved: {cache_stats["cache_hits"]}"""
message = f"""🛡️ Daily IP Check Report
📊 Total IPs processed: {total_ips}
⚠️ Risky IPs found: {risky_ips}
🔒 IPs added to WAF: {added_ips}{cache_message}
Status: {'✅ Completed successfully' if risky_ips >= 0 else '❌ Failed'}
Request ID: {request_id}
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
logger.info("Telegram notification sent successfully")
except Exception as e:
logger.error(f"Error sending Telegram notification: {str(e)}")
def send_error_notification(bot_token: str, chat_id: str, error_message: str, context=None):
"""Send Telegram notification about errors"""
try:
request_id = context.aws_request_id if context else 'Unknown'
message = f"""❌ Daily IP Check Error
Error: {error_message}
Request ID: {request_id}
Please check CloudWatch logs for more details.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
except Exception as e:
logger.error(f"Error sending error notification: {str(e)}")
The above code:
- Reads a CSV file where IP addresses are stored in the second column when the file is uploaded to the following bucket proxycheck-waf-automation, and de-duplicates them.
- Checks their risk scores with ProxyCheck's client SDK . Once a risky IP is detected, it will be added to the WAF IP Set using the boto3 client.
- Caches scanned IP addresses in Elasticache Valkey along with their risk scores for 30 days to make sure we don't scan them again in this period.
- Sends a summary report to your Telegram chat.
You need to set your already created WAF IP Set ID in the code. In our case, we're updating the AWS WAF to protect Cloudfront distributions with an already existing rule that includes the IP Set to block IP addresses. The above example covers IPv4 addresses; you can apply the same principles to IPv6 addresses.
And below are the results !
Thanks for tuning in!
Top comments (0)