How to go from finding spam in your inbox to automatically reporting the infrastructure behind it
In my previous article, I covered how to harden your email domain with SPF, DKIM, and DMARC. The configuration works well. It kills the vast majority of inbound spam before it ever reaches your device.
But there's a category of spam those tools can't touch: mail from operators who set up authentication correctly, on purpose, specifically to evade your filters. Fully authenticated. Low spam scores. Rotating domains across a dozen TLDs. AI-generated cover text to confuse content classifiers.
That mail gets through. It lands in your Junk folder, caught by client-side content analysis. And there it sits.
Marking it as junk and deleting it is the wrong response. That spam is coming from infrastructure that Spamhaus may not know about yet — and Spamhaus is how blocklists get built that protect everyone. If you have evidence of an active spam campaign, reporting it is the right move.
The problem is that reporting spam manually is tedious. Spamhaus has a submission portal, but visiting it for each individual message is not a workflow anyone will sustain.
This article covers how to automate it.
What We're Building
A Python script that:
- Connects to your mail server via IMAP and watches your Junk folder
- Parses each message to extract the sending IP, envelope domains, and malicious URLs
- Looks up the sending IP against RIPE Stat to identify the hosting infrastructure
- Submits the IP, domains, URLs, and a raw email sample to the Spamhaus API
- Marks processed messages with a custom IMAP flag — no local files or database required
- Deduplicates indicators within each run so the same IP or domain is only submitted once
- Reports a grouped summary of your submission status after each run
Why IMAP Flags for State
The script uses a custom IMAP keyword flag ($SpamhausProcessed) to track which messages have already been processed. This means:
- No local flat files or database to manage
- State survives the script being moved to a different machine
- You can inspect it from any mail client that shows keyword flags
- If your mail provider changes, the flag moves with the message
On startup, the script runs a functional capability test — it attempts to set and immediately remove a test flag on the first available message. If your server doesn't support custom keywords, the script aborts cleanly rather than failing silently mid-run.
A message is flagged as processed once it has been examined, regardless of whether individual API submissions succeeded. This is an intentional design choice: it prevents the script from reprocessing the same message indefinitely if a single indicator fails. Spamhaus returns HTTP 208 for already-known indicators, which handles any duplicate submissions across runs gracefully.
Prerequisites
Spamhaus account and API token:
Register at submit.spamhaus.org, then go to auth.spamhaus.org/account, scroll to "API Key Creation", and create a key. Copy it immediately — it's only shown once.
Python dependencies:
pip install bs4 requests
Environment variables:
export IMAP_SERVER=mail.example.com
export IMAP_PORT=993
export IMAP_USER=you@example.com
export IMAP_PASSWORD=your_imap_password
export SPAMHAUS_TOKEN=your_spamhaus_api_token
Optional variables:
export IMAP_FOLDER=Junk # folder to watch (default: Junk)
export DRY_RUN=1 # parse without submitting or flagging
export DELAY=2 # seconds between new API submissions (default: 2)
export VERBOSE_LIST=1 # log every submission with its status
The Spamhaus Submission API
Spamhaus exposes a REST API at https://submit.spamhaus.org/portal/api/v1. All requests require a Bearer token header.
Four submission types are relevant here:
| Endpoint | Threat type | What it submits |
|---|---|---|
POST submissions/add/ip |
spam |
Sending IP address |
POST submissions/add/domain |
spam |
Sending or landing domain |
POST submissions/add/url |
scam |
Malicious URL from message body |
POST submissions/add/email |
spam |
Raw email as evidence |
On threat type codes: The threat types used here are conservative defaults — spam for IPs and domains, scam for URLs. Stronger classifications like bulletproof or phish require evidence beyond what's available from a single message. The API documentation shows example codes that don't always work for your account tier. Verify valid codes first:
curl -s -H "Authorization: Bearer $SPAMHAUS_TOKEN" \
https://submit.spamhaus.org/portal/api/v1/lookup/threats-types
Conservative classifications aren't a weakness — Spamhaus has far more context than any individual submitter and will reclassify based on their own intelligence. What carries the most weight is the raw email submission. The full message gives analysts everything: the authentication chain in the headers, the sending infrastructure, the evasion techniques in the HTML, and the campaign fingerprint in the MIME structure. A precise threat type label matters far less than giving Spamhaus the evidence to make that determination themselves.
Rate limiting: The API returns HTTP 429 when you exceed your submission rate. The script retries up to 3 times with a 60-second wait between attempts.
HTTP 208 means already reported. If you submit something Spamhaus already has, they return 208. This is not an error — the script logs it as "already reported" and moves on. No sleep is applied on 208 responses; the delay only fires on successful new submissions (200) to pace actual API writes.
How the Script Works
IP Extraction
When a message arrives at your mail server, your MTA performs an SPF check and writes a Received-SPF header recording the result. That header contains client-ip= — the IP address of the server that connected to deliver the message. That's the sending IP we want.
The script reads only the topmost Received-SPF header because headers are prepended on arrival — the topmost one was written by your server when the message came in, and is the only one you can trust. Lower headers could have been injected by the spammer before sending, forged to make the mail look like it came from somewhere legitimate.
spf_headers = msg.get_all('Received-SPF') or []
if spf_headers:
match = re.search(r'client-ip=([0-9a-fA-F.:]+)', str(spf_headers[0]))
If no Received-SPF header is present, no IP is extracted and the IP submission is skipped. The alternative — walking the Received chain — risks reporting a legitimate forwarding service or ESP as the spam source. Domain, URL, and email submissions still proceed regardless.
Private, loopback, link-local, and reserved addresses are filtered using Python's ipaddress module, which covers the full RFC 1918/4193/6598 range correctly.
Domain Extraction
Domains are extracted from four sources for maximum coverage:
-
From,Reply-To, andReturn-Pathheaders usingemail.utils.getaddressesfor RFC-compliant address parsing -
DKIM-Signature d=tag, which identifies the signing domain regardless of whatFromclaims
The primary domain (used as the anchor for the raw email submission) prefers DKIM d= over Return-Path. Spammers often separate these deliberately — DKIM signs for the infrastructure domain while Return-Path uses a throwaway address.
All domains are IDNA-normalized before submission to collapse internationalized variants.
URL Extraction
URLs are extracted from the HTML body and normalized before deduplication:
- Tracking parameters (
utm_*,fbclid,gclid, etc.) are stripped - Query parameters are sorted so
?b=2&a=1and?a=1&b=2deduplicate correctly - Hostnames are lowercased
- Default ports (
:80,:443) are stripped - Malformed URLs are discarded rather than passed through
Unsubscribe links are skipped. Landing domains are extracted from each URL and submitted as domain indicators alongside the full URL — in spam campaigns, the URL domain is often the highest-value IOC.
Authentication Results
SPF, DKIM, and DMARC results are parsed from the topmost Authentication-Results header. Line folding is stripped before parsing so compound headers on multiple lines are read correctly. Results inform the submission reason string but do not change the threat type — authentication success alone doesn't imply intent.
RIR Enrichment
For each sending IP that passes the deduplication check, the script queries RIPE Stat (which aggregates all five RIRs globally) to get the network name, organization, and country. This enriches the submission reason with real infrastructure data:
Spam source. RIR: netname=EXAMPLE-NET org=Example Hosting Ltd country=XX.
Auth: spf=pass dkim=pass dmarc=pass (p=none). Found in Junk folder.
Results are cached using lru_cache(maxsize=2048). Each IP lookup makes an HTTP request to RIPE Stat — without caching, a batch of 50 messages from the same sending IP would trigger 50 identical network requests. With caching, the first call for a given IP hits the network and stores the result; every subsequent call with the same IP returns the stored result instantly.
The maxsize=2048 cap prevents unbounded memory growth in daemon mode. Without a limit, the cache accumulates one entry per unique IP seen since the script started — a slow memory leak over weeks of continuous operation. Once 2048 entries are cached, the least recently used are evicted to make room for new ones. For a personal inbox this limit is effectively never reached, but it's the right engineering choice regardless.
The lookup is deferred until after the deduplication check — no network I/O for IPs already seen in the current run.
Deduplication
Three layers work together:
Within a single run, a state_tracker dict holds sets of already-seen IPs, domains, URLs, and email domains. The same indicator is only submitted once per run regardless of how many messages contain it.
Across runs, the IMAP flag on each message means already-processed messages are skipped entirely on the next run.
At the Spamhaus level, HTTP 208 handles any indicators that slip through — the API is idempotent.
The Full Script
Save this as spam-monitor.py (The latest version is on Github:
#!/usr/bin/env python3
"""
spam-monitor.py — Automated spam analysis and Spamhaus submission
Monitors an IMAP Junk folder for spam, extracts infrastructure indicators,
and submits them to the Spamhaus API. Uses a custom IMAP flag for state
tracking — no local database or flat files required.
Required environment variables:
IMAP_SERVER — e.g. mail.example.com
IMAP_PORT — e.g. 993 (default)
IMAP_USER — your full email address
IMAP_PASSWORD — your IMAP password
SPAMHAUS_TOKEN — your Spamhaus submission API token
Optional environment variables:
IMAP_FOLDER — folder to watch (default: Junk)
DRY_RUN — set to "1" to parse without submitting (default: 0)
DELAY — seconds between API calls (default: 2)
VERBOSE_LIST — set to "1" to log every submission with its status (default: 0)
Usage:
python3 spam-monitor.py # run once
python3 spam-monitor.py --daemon # run continuously
DRY_RUN=1 python3 spam-monitor.py # dry run
"""
import imaplib
import email
import email.policy
import os
import re
import sys
import json
import time
import logging
import argparse
import socket
import ipaddress
import urllib.request
import requests
from collections import defaultdict
from email.utils import getaddresses
from functools import lru_cache
from urllib.parse import urlparse, urlencode, parse_qsl, urlunparse
from bs4 import BeautifulSoup
# ─────────────────────────────────────────────
# CONFIGURATION FROM ENVIRONMENT
# ─────────────────────────────────────────────
IMAP_SERVER = os.environ.get('IMAP_SERVER', '')
IMAP_PORT = int(os.environ.get('IMAP_PORT', 993))
IMAP_USER = os.environ.get('IMAP_USER', '')
IMAP_PASSWORD = os.environ.get('IMAP_PASSWORD', '')
SPAMHAUS_TOKEN = os.environ.get('SPAMHAUS_TOKEN', '')
IMAP_FOLDER = os.environ.get('IMAP_FOLDER', 'Junk')
DRY_RUN = os.environ.get('DRY_RUN', '0').strip() == '1'
DELAY = float(os.environ.get('DELAY', '2'))
VERBOSE_LIST = os.environ.get('VERBOSE_LIST', '0').strip() == '1'
SPAMHAUS_API = 'https://submit.spamhaus.org/portal/api/v1'
RIR_API = 'https://stat.ripe.net/data/whois/data.json'
PROCESSED_FLAG = '$SpamhausProcessed'
CAPABILITY_FLAG = '$SpamhausCapabilityTest'
_TRACKING_PARAMS = frozenset({
'utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content',
'fbclid', 'gclid', 'msclkid', 'mc_eid', 'mc_cid',
})
socket.setdefaulttimeout(60)
# ─────────────────────────────────────────────
# LOGGING
# ─────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# UTILITIES
# ─────────────────────────────────────────────
def _normalize_domain(domain):
if not domain:
return ''
try:
return domain.strip().encode('idna').decode('ascii').lower()
except Exception:
return domain.strip().lower()
def _is_internal_ip(ip):
try:
return _is_internal_addr(ipaddress.ip_address(ip))
except ValueError:
return True
def _is_internal_addr(addr):
return (addr.is_private or addr.is_loopback or
addr.is_link_local or addr.is_reserved)
# ─────────────────────────────────────────────
# EMAIL PARSING
# ─────────────────────────────────────────────
def extract_sending_ip(msg):
spf_headers = msg.get_all('Received-SPF') or []
if spf_headers:
match = re.search(r'client-ip=([0-9a-fA-F.:]+)', str(spf_headers[0]))
if match:
ip = match.group(1).strip()
if not _is_internal_ip(ip):
return ip
return None
def extract_envelope_domains(msg):
domains = set()
for field in ('From', 'Reply-To', 'Return-Path'):
headers_raw = [str(h) for h in (msg.get_all(field) or [])]
for _, addr in getaddresses(headers_raw):
if '@' in addr:
domain = _normalize_domain(addr.rsplit('@', 1)[1])
if domain:
domains.add(domain)
for dkim_header in msg.get_all('DKIM-Signature') or []:
flat = re.sub(r'\s+', '', str(dkim_header))
match = re.search(r'\bd=([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', flat, re.IGNORECASE)
if match:
domains.add(_normalize_domain(match.group(1)))
return domains
def extract_primary_domain(msg):
for dkim_header in msg.get_all('DKIM-Signature') or []:
flat = re.sub(r'\s+', '', str(dkim_header))
match = re.search(r'\bd=([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', flat, re.IGNORECASE)
if match:
return _normalize_domain(match.group(1))
headers_raw = [str(h) for h in (msg.get_all('Return-Path') or [])]
for _, addr in getaddresses(headers_raw):
if '@' in addr:
return _normalize_domain(addr.rsplit('@', 1)[1])
return None
def extract_auth_results(msg):
auth_headers = msg.get_all('Authentication-Results') or []
if not auth_headers:
return {'spf': 'unknown', 'dkim': 'unknown', 'dmarc': 'unknown', 'dmarc_policy': 'unknown'}
auth = re.sub(r'\s+', ' ', str(auth_headers[0]))
def extract(pattern):
m = re.search(pattern, auth, re.IGNORECASE)
return m.group(1).lower() if m else 'unknown'
spf = extract(r'\bspf=(pass|fail|softfail|neutral|none|permerror|temperror)\b')
dkim = extract(r'\bdkim=(pass|fail|none|policy|neutral|temperror|permerror)\b')
dmarc = extract(r'\bdmarc=(pass|fail|none|bestguesspass|temperror|permerror)\b')
dmarc_policy = extract(r'\b(?:policy\.[A-Za-z_-]*|p)=([A-Za-z]+)')
return {'spf': spf, 'dkim': dkim, 'dmarc': dmarc, 'dmarc_policy': dmarc_policy}
def normalize_url(href):
try:
parsed = urlparse(href)
port = parsed.port
clean_params = sorted(
(k, v) for k, v in parse_qsl(parsed.query)
if k.lower() not in _TRACKING_PARAMS
)
hostname = _normalize_domain(parsed.hostname or '')
if not hostname:
return None
if (parsed.scheme == 'https' and port == 443) or (parsed.scheme == 'http' and port == 80):
port = None
netloc = hostname if port is None else f'{hostname}:{port}'
return urlunparse(parsed._replace(netloc=netloc, query=urlencode(clean_params)))
except Exception:
return None
def extract_cta_urls(msg):
urls = set()
for part in msg.walk():
if part.get_content_type() == 'text/html':
soup = None
try:
html = part.get_payload(decode=True).decode('utf-8', errors='ignore')
soup = BeautifulSoup(html, 'html.parser')
for a in soup.find_all('a', href=True):
href = a['href'].strip()
if not href.startswith(('http://', 'https://')):
continue
if any(s in href.lower() for s in ('unsub', 'optout', 'opt-out', 'remove', 'list-unsubscribe')):
continue
normalized = normalize_url(href)
if normalized:
urls.add(normalized)
except Exception as e:
log.debug(f'URL extraction error: {e}')
finally:
if soup:
soup.decompose()
return list(urls)
@lru_cache(maxsize=2048)
def rir_lookup(ip):
if not ip:
return {}
try:
url = f'{RIR_API}?resource={ip}'
req = urllib.request.Request(url, headers={'Accept': 'application/json'})
with urllib.request.urlopen(req, timeout=8) as resp:
data = json.loads(resp.read())
records = data.get('data', {}).get('records', [])
result = {}
for group in records:
for record in group:
key = record.get('key', '').lower()
if key in ('netname', 'org', 'country', 'descr'):
result[key] = record.get('value', '')
return result
except Exception as e:
log.debug(f'RIR lookup failed for {ip}: {e}')
return {}
def parse_message(raw_bytes):
msg = email.message_from_bytes(raw_bytes, policy=email.policy.default)
return {
'ip': extract_sending_ip(msg),
'primary_domain': extract_primary_domain(msg),
'envelope_domains': extract_envelope_domains(msg),
'urls': extract_cta_urls(msg),
'auth': extract_auth_results(msg),
'subject': str(msg.get('Subject', '')),
'rspamd': str(msg.get('X-Rspamd-Score', 'N/A')),
}
# ─────────────────────────────────────────────
# SPAMHAUS API
# ─────────────────────────────────────────────
THREAT_IP = 'spam'
THREAT_DOMAIN = 'spam'
THREAT_URL = 'scam'
THREAT_EMAIL = 'spam'
REASON_IP = lambda ripe, auth: (
f'Spam source. RIR: netname={ripe.get("netname","unknown")} '
f'org={ripe.get("org", ripe.get("descr","unknown"))} '
f'country={ripe.get("country","unknown")}. '
f'Auth: spf={auth.get("spf")} dkim={auth.get("dkim")} '
f'dmarc={auth.get("dmarc")} (p={auth.get("dmarc_policy","unknown")}). '
f'Found in Junk folder.'
)
REASON_DOMAIN = 'Spam domain found in Junk folder.'
REASON_URL = 'Scam URL extracted from spam email body.'
REASON_EMAIL = 'Spam email found in Junk folder.'
def spamhaus_request(endpoint, payload=None, method='POST', retries=3):
url = f'{SPAMHAUS_API}/{endpoint}'
headers = {'Authorization': f'Bearer {SPAMHAUS_TOKEN}'}
for attempt in range(1, retries + 1):
try:
resp = requests.request(
method, url,
headers=headers,
json=payload if payload is not None else None,
timeout=30
)
if resp.status_code == 429:
log.warning(f'Rate limited — waiting 60s (attempt {attempt}/{retries})')
time.sleep(60)
continue
elif resp.status_code == 208:
return 208, resp.json() if resp.text else {}
elif not resp.ok:
try:
err_payload = resp.json()
except Exception:
err_payload = {'error': resp.text}
log.error(f'HTTP {resp.status_code}: {err_payload}')
return resp.status_code, err_payload
return resp.status_code, resp.json() if resp.text else {}
except Exception as e:
log.error(f'Request error: {e}')
return 0, {}
return 429, {'message': 'rate limit retries exhausted'}
def submit(submission_type, key, object_value, threat_type, reason):
label = key.replace('email:', '') if submission_type == 'email' else key
if DRY_RUN:
log.info(f' [DRY RUN] Would submit {submission_type.upper()}: {label}')
return
status, body = spamhaus_request(f'submissions/add/{submission_type}', {
'threat_type': threat_type,
'reason': reason,
'source': {'object': object_value}
})
if status in (200, 208):
log.info(f' {submission_type.upper()} {label} — {"OK" if status == 200 else "already reported"}')
if status == 200:
time.sleep(DELAY)
else:
log.warning(f' {submission_type.upper()} {label} — failed ({status}): {body}')
def check_submission_count():
status, data = spamhaus_request('submissions/count', method='GET')
if status != 200:
log.warning(f'Could not fetch submission count: HTTP {status}')
return
total = data.get('total', 0)
matched = data.get('matched', 0)
new = total - matched
pct_matched = int(matched / total * 100) if total else 0
pct_new = int(new / total * 100) if total else 0
log.info(
f'Spamhaus totals (30 days): {total} submitted — '
f'{matched} corroborated ({pct_matched}%), '
f'{new} new intelligence ({pct_new}%)'
)
status, items = spamhaus_request('submissions/list?items=10000', method='GET')
if status != 200:
log.warning(f'Could not fetch submissions list: HTTP {status}')
return
groups = defaultdict(lambda: {'listed': 0, 'checked': 0, 'pending': 0})
for item in items:
t = item.get('submission_type', 'unknown')
if item.get('listed'):
groups[t]['listed'] += 1
elif item.get('last_check'):
groups[t]['checked'] += 1
else:
groups[t]['pending'] += 1
for t, counts in sorted(groups.items()):
log.info(
f' {t.upper()}: {counts["listed"]} listed, '
f'{counts["checked"]} checked/not listed, '
f'{counts["pending"]} pending'
)
if VERBOSE_LIST:
log.info('--- Verbose submission list ---')
for item in items:
stype = item.get('submission_type', '?')
if stype == 'email':
obj = item.get('attributes', {}).get('subject', '(no subject)')
else:
obj = item.get('source', {}).get('object', '?')
listed = item.get('listed')
if listed:
status_str = f'listed: {", ".join(listed)}'
elif item.get('last_check'):
status_str = 'checked, not listed'
else:
status_str = 'pending review'
log.info(f' {stype.upper()} {obj} — {status_str}')
# ─────────────────────────────────────────────
# PROCESSING
# ─────────────────────────────────────────────
def process_message(raw_bytes, state_tracker):
parsed = parse_message(raw_bytes)
auth = parsed['auth']
log.info(f' IP={parsed["ip"]} primary_domain={parsed["primary_domain"]}')
log.info(f' Subject: {parsed["subject"]}')
log.info(f' Rspamd: {parsed["rspamd"]}')
log.info(f' Auth: spf={auth.get("spf")} dkim={auth.get("dkim")} dmarc={auth.get("dmarc")} (p={auth.get("dmarc_policy")})')
if parsed['ip'] and parsed['ip'] not in state_tracker['ips']:
state_tracker['ips'].add(parsed['ip'])
ripe = rir_lookup(parsed['ip'])
if ripe:
log.info(f' RIR: netname={ripe.get("netname")} country={ripe.get("country")}')
submit('ip', parsed['ip'], parsed['ip'], THREAT_IP, REASON_IP(ripe, auth))
for domain in parsed['envelope_domains']:
if domain not in state_tracker['domains']:
state_tracker['domains'].add(domain)
submit('domain', domain, domain, THREAT_DOMAIN, REASON_DOMAIN)
if parsed['primary_domain'] and parsed['primary_domain'] not in state_tracker['emails']:
state_tracker['emails'].add(parsed['primary_domain'])
key = f'email:{parsed["primary_domain"]}'
MAX_EMAIL_BYTES = 1024 * 1024
email_sample = raw_bytes[:MAX_EMAIL_BYTES].decode('utf-8', errors='replace')
submit('email', key, email_sample, THREAT_EMAIL, REASON_EMAIL)
for url in parsed['urls']:
if url not in state_tracker['urls']:
state_tracker['urls'].add(url)
submit('url', url, url, THREAT_URL, REASON_URL)
try:
hostname = _normalize_domain(urlparse(url).hostname or '')
if hostname and hostname not in parsed['envelope_domains'] and hostname not in state_tracker['domains']:
state_tracker['domains'].add(hostname)
submit('domain', hostname, hostname, THREAT_DOMAIN,
f'Landing domain extracted from spam URL. {REASON_DOMAIN}')
except Exception as e:
log.debug(f'Could not extract landing domain from URL: {e}')
# ─────────────────────────────────────────────
# IMAP
# ─────────────────────────────────────────────
def connect_imap():
conn = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT, timeout=60)
conn.login(IMAP_USER, IMAP_PASSWORD)
log.info(f'Connected to {IMAP_SERVER}:{IMAP_PORT} as {IMAP_USER}')
return conn
def run_once():
if not all([IMAP_SERVER, IMAP_USER, IMAP_PASSWORD, SPAMHAUS_TOKEN]):
log.error('Missing required environment variables.')
sys.exit(1)
if DRY_RUN:
log.info('*** DRY RUN mode — no submissions or flags will be applied ***')
conn = None
total_processed = 0
try:
conn = connect_imap()
if conn.select(f'"{IMAP_FOLDER}"', readonly=False)[0] != 'OK':
log.error(f'Could not select folder: {IMAP_FOLDER}')
return
status, data = conn.uid('search', None, f'NOT KEYWORD {PROCESSED_FLAG}')
if status != 'OK' or not data[0]:
log.info(f'Folder {IMAP_FOLDER}: No unprocessed messages.')
return
uids = data[0].split()
log.info(f'Folder {IMAP_FOLDER}: {len(uids)} unprocessed message(s)')
if not DRY_RUN:
test_status, _ = conn.uid('store', uids[0], '+FLAGS', CAPABILITY_FLAG)
if test_status != 'OK':
log.critical('IMAP server rejected custom keyword flags — cannot track state. Aborting.')
return
try:
conn.uid('store', uids[0], '-FLAGS', CAPABILITY_FLAG)
except Exception:
pass
state_tracker = {'ips': set(), 'domains': set(), 'urls': set(), 'emails': set()}
for uid in uids:
status, msg_data = conn.uid('fetch', uid, '(RFC822)')
if status != 'OK' or not msg_data or not msg_data[0]:
continue
raw_bytes = msg_data[0][1]
log.info(f'Processing message UID {uid.decode()}')
try:
process_message(raw_bytes, state_tracker)
total_processed += 1
if not DRY_RUN:
conn.uid('store', uid, '+FLAGS', PROCESSED_FLAG)
log.info(f' Flagged message UID {uid.decode()} as processed')
except Exception as e:
log.error(f' Failed to process message UID {uid.decode()}: {e}')
finally:
log.info(f'Done. {total_processed} message(s) processed.')
if conn:
if total_processed:
try:
check_submission_count()
except Exception as e:
log.error(f'Could not fetch submission count: {e}')
try:
conn.logout()
except Exception:
pass
def run_daemon(interval=300):
log.info(f'Daemon mode — checking every {interval}s')
while True:
try:
run_once()
except Exception as e:
log.error(f'Error in run loop: {e}')
log.info(f'Sleeping {interval}s...')
time.sleep(interval)
# ─────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Spam monitor and Spamhaus submitter')
parser.add_argument('--daemon', action='store_true', help='Run continuously')
parser.add_argument('--interval', type=int, default=300,
help='Daemon check interval in seconds (default: 300)')
args = parser.parse_args()
if args.daemon:
run_daemon(args.interval)
else:
run_once()
Running It
Always dry run first:
DRY_RUN=1 python3 spam-monitor.py
This parses every message and logs what would be submitted without touching the API or setting any flags. Check the output carefully before running live.
Single run:
python3 spam-monitor.py
Daemon mode (checks every 5 minutes):
python3 spam-monitor.py --daemon --interval 300
Cron job (every 10 minutes):
*/10 * * * * cd /path/to/script && python3 spam-monitor.py
Full submission detail:
VERBOSE_LIST=1 python3 spam-monitor.py
After each run that processes at least one message, the script logs a summary of your Spamhaus submissions for the past 30 days, broken down by type and listing status:
Spamhaus totals (30 days): 312 submitted — 187 corroborated (59%), 125 new intelligence (40%)
DOMAIN: 84 listed, 12 checked/not listed, 7 pending
EMAIL: 41 listed, 8 checked/not listed, 3 pending
IP: 73 listed, 11 checked/not listed, 5 pending
URL: 35 listed, 6 checked/not listed, 4 pending
Known Limitations
This script is designed for personal use — a single inbox, running periodically, low submission volume. It works well in that context. A few things to be aware of:
IP extraction requires Received-SPF. The script only extracts IPs from the topmost Received-SPF header, which your MTA writes on arrival. If that header is absent — unusual on modern mail providers but possible on misconfigured servers — no IP is submitted. The Received chain is not used as a fallback because it risks reporting legitimate forwarding infrastructure.
Domains from envelope headers may include spoofed legitimate domains. If a message spoofs paypal.com in the From header and your server doesn't drop it, the script will attempt to report it. Spamhaus's analyst review process handles false positives, but it's worth monitoring your submission acceptance rate.
URL landing domains may be legitimate redirectors. CDN hostnames, link shorteners, and ESP tracking domains sometimes appear in spam. The script submits them — whether that's useful depends on the campaign.
State is tied to IMAP keyword support. Most modern IMAP servers support custom keywords (Dovecot, Cyrus, Gmail). Some hosted providers don't. The script tests for support at startup and aborts if the server rejects the flag.
This is a personal-use tool, not an enterprise pipeline. SQLite-backed state, archive-instead-of-delete retention, and multi-account support are the natural next steps if you outgrow it. Follow github.com/Sageth/spamhaus-reporting for updates.
A Note on Signal Quality
Your submissions are breadcrumbs, not the whole map. Spamhaus has vastly better detection infrastructure than any individual submitter. What you're providing is timing — you're reporting infrastructure while it's actively sending, not after the campaign has ended.
The RIR enrichment matters more than the reason text. Spamhaus analysts can see that a netname maps to a specific hosting provider known for bulletproof services. That infrastructure context is more useful than a paragraph of narrative about what the spam said.
Submitting the raw email alongside the IP and domain gives analysts the full picture — headers showing the authentication chain, HTML showing the evasion techniques, MIME structure showing the hidden text.
What the blocklists do:
The Spamhaus Blocklist (SBL) lists IP addresses observed sending spam or hosting malicious infrastructure. Mail servers that check Spamhaus reject connections from listed IPs at the SMTP level — before a message is even accepted. A listed IP can't deliver mail to anyone using Spamhaus-backed filtering until the operator cleans up their act and gets delisted.
The Domain Blocklist (DBL) lists domains observed in spam campaigns — sending domains, hosting domains, and URLs found in message bodies. DBL listings propagate into DNS firewalls, email security products, and browser filters. A listed domain gets blocked across every product that queries Spamhaus, not just email.
The Hash Blocklist (HBL) lists cryptographic hashes of malicious content — email addresses, file hashes, cryptocurrency wallet addresses. Less visible in day-to-day reporting, but your raw email submissions contribute to it.
What the submission statuses mean:
After submission, Spamhaus reviews each indicator and returns one of three statuses:
- Listed — Spamhaus has confirmed the indicator as malicious and added it to the relevant blocklist. This is the outcome that matters.
- Checked, not listed — Spamhaus reviewed the indicator and didn't list it, either because it didn't meet their threshold or because it's already been cleaned up.
- Pending — the indicator is in the review queue. High submission volumes mean some indicators take time to process.
The corroboration percentage in the summary log shows how many of your submissions matched intelligence Spamhaus already had. A high corroboration rate means you're seeing the same infrastructure they're already tracking — your timing confirmation is still useful. A high new intelligence rate means you're getting there first.
If a range gets listed on SBL, the operator has to spin up new infrastructure, acquire new IP space, reconfigure sending, and rebuild sending reputation from scratch. If a domain gets listed on DBL, they need new domains, new DNS, new authentication records. That costs time and money, and it's the friction that makes automated spam campaigns expensive to sustain.
See also: Why Your Email Is an Open Door for Spammers — And How to Lock It
Top comments (0)