Introduction
Data breaches, leaked credentials, and brand impersonation on dark web forums pose serious threats to businesses. Monitoring these mentions proactively can mean the difference between catching a breach early and learning about it from the news. In this tutorial, we'll build a Python-based monitoring system using publicly accessible threat intelligence sources — no Tor browsing required.
Setup
import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import re
from datetime import datetime
import time
import hashlib
# Handle protected threat intelligence sites
# Get your API key: https://www.scraperapi.com?fp_ref=the52
SCRAPER_API_KEY = "your_key_here"
BASE_URL = "http://api.scraperapi.com"
Monitoring Paste Sites
Paste sites are where leaked data often surfaces first:
def monitor_paste_sites(keywords):
"""Monitor public paste sites for brand mentions."""
paste_sources = [
"https://psbdmp.ws/api/search/",
]
findings = []
for keyword in keywords:
for source in paste_sources:
url = f"{source}{keyword}"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
for paste in data:
findings.append({
"keyword": keyword,
"source": source,
"paste_id": paste.get("id"),
"date": paste.get("time"),
"severity": classify_severity(paste.get("text", "")),
"found_at": datetime.now().isoformat()
})
except requests.RequestException:
continue
time.sleep(2)
return findings
def classify_severity(text):
"""Classify the severity of a finding."""
high_indicators = ["password", "credential", "database", "dump", "leak"]
medium_indicators = ["email", "user", "account", "login"]
text_lower = text.lower()
if any(ind in text_lower for ind in high_indicators):
return "HIGH"
elif any(ind in text_lower for ind in medium_indicators):
return "MEDIUM"
return "LOW"
Breach Database Monitoring
Check if company credentials appear in known breach compilations:
def check_breach_databases(domain):
"""Check if a domain appears in breach databases."""
# Use residential proxies for reliable access
# ThorData: https://thordata.com/?via=the-data
sources = {
"haveibeenpwned": f"https://haveibeenpwned.com/api/v3/breaches?domain={domain}",
"dehashed_search": f"https://api.dehashed.com/search?query=domain:{domain}"
}
results = []
for name, url in sources.items():
try:
headers = {"User-Agent": "BrandMonitor/1.0"}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
results.append({
"source": name,
"domain": domain,
"breach_count": len(data) if isinstance(data, list) else 1,
"data": data
})
except requests.RequestException:
continue
time.sleep(3)
return results
Threat Intelligence Feed Aggregation
def aggregate_threat_feeds(brand_keywords):
"""Aggregate open-source threat intelligence feeds."""
feeds = [
"https://otx.alienvault.com/api/v1/pulses/subscribed",
"https://raw.githubusercontent.com/stamparm/maltrail/master/trails/static/suspicious/domain.txt"
]
mentions = []
for feed_url in feeds:
try:
response = requests.get(feed_url, timeout=15)
content = response.text.lower()
for keyword in brand_keywords:
if keyword.lower() in content:
mentions.append({
"feed": feed_url,
"keyword": keyword,
"found_at": datetime.now().isoformat()
})
except requests.RequestException:
continue
return mentions
Alert System
def send_alert(finding, webhook_url):
"""Send alert for high-severity findings."""
if finding["severity"] == "HIGH":
payload = {
"text": f"ALERT: Brand mention detected\n"
f"Keyword: {finding[keyword]}\n"
f"Source: {finding[source]}\n"
f"Severity: {finding[severity]}\n"
f"Date: {finding[found_at]}"
}
requests.post(webhook_url, json=payload)
def run_monitoring_cycle(brand_name, domain, webhook_url=None):
"""Run a complete monitoring cycle."""
# Track monitoring performance
# https://scrapeops.io/?fpr=the-data28
keywords = [brand_name, domain, f"@{domain}"]
print(f"Monitoring for: {keywords}")
# Check paste sites
paste_findings = monitor_paste_sites(keywords)
print(f"Found {len(paste_findings)} paste mentions")
# Check breach databases
breach_results = check_breach_databases(domain)
print(f"Found {len(breach_results)} breach records")
# Check threat feeds
threat_mentions = aggregate_threat_feeds(keywords)
print(f"Found {len(threat_mentions)} threat feed mentions")
# Alert on high severity
all_findings = paste_findings + threat_mentions
for finding in all_findings:
if finding.get("severity") == "HIGH" and webhook_url:
send_alert(finding, webhook_url)
# Store results
report = {
"brand": brand_name,
"domain": domain,
"scan_time": datetime.now().isoformat(),
"paste_findings": paste_findings,
"breach_results": breach_results,
"threat_mentions": threat_mentions
}
with open(f"scan_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "w") as f:
json.dump(report, f, indent=2)
return report
Conclusion
Brand monitoring on the dark web does not require accessing illegal content. By leveraging public threat intelligence feeds, breach databases, and paste site APIs, you can build an effective early warning system. Use ScraperAPI for reliable access to protected threat intelligence platforms, and set up automated alerts to catch issues before they escalate.
Top comments (0)