DEV Community

agenthustler
agenthustler

Posted on

How to Monitor Dark Web Mentions of Your Brand

Introduction

Data breaches, leaked credentials, and brand impersonation on dark web forums pose serious threats to businesses. Monitoring these mentions proactively can mean the difference between catching a breach early and learning about it from the news. In this tutorial, we'll build a Python-based monitoring system using publicly accessible threat intelligence sources — no Tor browsing required.

Setup

import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import re
from datetime import datetime
import time
import hashlib

# Handle protected threat intelligence sites
# Get your API key: https://www.scraperapi.com?fp_ref=the52
SCRAPER_API_KEY = "your_key_here"
BASE_URL = "http://api.scraperapi.com"
Enter fullscreen mode Exit fullscreen mode

Monitoring Paste Sites

Paste sites are where leaked data often surfaces first:

def monitor_paste_sites(keywords):
    """Monitor public paste sites for brand mentions."""
    paste_sources = [
        "https://psbdmp.ws/api/search/",
    ]

    findings = []
    for keyword in keywords:
        for source in paste_sources:
            url = f"{source}{keyword}"
            try:
                response = requests.get(url, timeout=10)
                if response.status_code == 200:
                    data = response.json()
                    for paste in data:
                        findings.append({
                            "keyword": keyword,
                            "source": source,
                            "paste_id": paste.get("id"),
                            "date": paste.get("time"),
                            "severity": classify_severity(paste.get("text", "")),
                            "found_at": datetime.now().isoformat()
                        })
            except requests.RequestException:
                continue
            time.sleep(2)

    return findings

def classify_severity(text):
    """Classify the severity of a finding."""
    high_indicators = ["password", "credential", "database", "dump", "leak"]
    medium_indicators = ["email", "user", "account", "login"]

    text_lower = text.lower()
    if any(ind in text_lower for ind in high_indicators):
        return "HIGH"
    elif any(ind in text_lower for ind in medium_indicators):
        return "MEDIUM"
    return "LOW"
Enter fullscreen mode Exit fullscreen mode

Breach Database Monitoring

Check if company credentials appear in known breach compilations:

def check_breach_databases(domain):
    """Check if a domain appears in breach databases."""
    # Use residential proxies for reliable access
    # ThorData: https://thordata.com/?via=the-data

    sources = {
        "haveibeenpwned": f"https://haveibeenpwned.com/api/v3/breaches?domain={domain}",
        "dehashed_search": f"https://api.dehashed.com/search?query=domain:{domain}"
    }

    results = []
    for name, url in sources.items():
        try:
            headers = {"User-Agent": "BrandMonitor/1.0"}
            response = requests.get(url, headers=headers, timeout=10)
            if response.status_code == 200:
                data = response.json()
                results.append({
                    "source": name,
                    "domain": domain,
                    "breach_count": len(data) if isinstance(data, list) else 1,
                    "data": data
                })
        except requests.RequestException:
            continue
        time.sleep(3)

    return results
Enter fullscreen mode Exit fullscreen mode

Threat Intelligence Feed Aggregation

def aggregate_threat_feeds(brand_keywords):
    """Aggregate open-source threat intelligence feeds."""
    feeds = [
        "https://otx.alienvault.com/api/v1/pulses/subscribed",
        "https://raw.githubusercontent.com/stamparm/maltrail/master/trails/static/suspicious/domain.txt"
    ]

    mentions = []
    for feed_url in feeds:
        try:
            response = requests.get(feed_url, timeout=15)
            content = response.text.lower()

            for keyword in brand_keywords:
                if keyword.lower() in content:
                    mentions.append({
                        "feed": feed_url,
                        "keyword": keyword,
                        "found_at": datetime.now().isoformat()
                    })
        except requests.RequestException:
            continue

    return mentions
Enter fullscreen mode Exit fullscreen mode

Alert System

def send_alert(finding, webhook_url):
    """Send alert for high-severity findings."""
    if finding["severity"] == "HIGH":
        payload = {
            "text": f"ALERT: Brand mention detected\n"
                    f"Keyword: {finding[keyword]}\n"
                    f"Source: {finding[source]}\n"
                    f"Severity: {finding[severity]}\n"
                    f"Date: {finding[found_at]}"
        }
        requests.post(webhook_url, json=payload)

def run_monitoring_cycle(brand_name, domain, webhook_url=None):
    """Run a complete monitoring cycle."""
    # Track monitoring performance
    # https://scrapeops.io/?fpr=the-data28

    keywords = [brand_name, domain, f"@{domain}"]

    print(f"Monitoring for: {keywords}")

    # Check paste sites
    paste_findings = monitor_paste_sites(keywords)
    print(f"Found {len(paste_findings)} paste mentions")

    # Check breach databases
    breach_results = check_breach_databases(domain)
    print(f"Found {len(breach_results)} breach records")

    # Check threat feeds
    threat_mentions = aggregate_threat_feeds(keywords)
    print(f"Found {len(threat_mentions)} threat feed mentions")

    # Alert on high severity
    all_findings = paste_findings + threat_mentions
    for finding in all_findings:
        if finding.get("severity") == "HIGH" and webhook_url:
            send_alert(finding, webhook_url)

    # Store results
    report = {
        "brand": brand_name,
        "domain": domain,
        "scan_time": datetime.now().isoformat(),
        "paste_findings": paste_findings,
        "breach_results": breach_results,
        "threat_mentions": threat_mentions
    }

    with open(f"scan_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "w") as f:
        json.dump(report, f, indent=2)

    return report
Enter fullscreen mode Exit fullscreen mode

Conclusion

Brand monitoring on the dark web does not require accessing illegal content. By leveraging public threat intelligence feeds, breach databases, and paste site APIs, you can build an effective early warning system. Use ScraperAPI for reliable access to protected threat intelligence platforms, and set up automated alerts to catch issues before they escalate.

Top comments (0)