DEV Community

agenthustler
agenthustler

Posted on

How to Build a Supply Chain Disruption Early Warning System

Supply chain disruptions cost businesses billions. Port closures, factory shutdowns, shipping delays — early detection means early response. Here's how to build an automated monitoring system that scrapes supply chain signals from public sources.

Data Sources for Supply Chain Intelligence

  • Maritime tracking: Port congestion data, vessel positions
  • News monitoring: Factory closures, labor strikes, natural disasters
  • Government data: Import/export statistics, trade restrictions
  • Social media: On-the-ground reports from logistics workers

Setting Up Multi-Source Scraping

import requests
from bs4 import BeautifulSoup
from datetime import datetime
import json
import re

API_KEY = "YOUR_SCRAPERAPI_KEY"

class SupplyChainMonitor:
    def __init__(self, api_key):
        self.api_key = api_key
        self.alerts = []

    def scrape(self, url, render=False):
        params = {
            "api_key": self.api_key,
            "url": url,
            "render": "true" if render else "false"
        }
        resp = requests.get(
            "https://api.scraperapi.com", params=params, timeout=60
        )
        return BeautifulSoup(resp.text, "html.parser")
Enter fullscreen mode Exit fullscreen mode

ScraperAPI handles varying anti-bot protections across logistics sites, news platforms, and government portals.

Monitoring Port Congestion

Port congestion is the earliest physical signal of supply chain stress:

def check_port_congestion(self, port_name):
    soup = self.scrape(
        f"https://www.marinetraffic.com/en/data/?asset_type=ports&port={port_name}"
    )
    congestion_data = {}
    stats = soup.find_all("div", class_="port-stat")
    for stat in stats:
        label = stat.find("span", class_="label")
        value = stat.find("span", class_="value")
        if label and value:
            congestion_data[label.get_text(strip=True)] = value.get_text(strip=True)

    vessels_waiting = int(congestion_data.get("Vessels Waiting", "0").replace(",", ""))
    if vessels_waiting > 50:
        self.alerts.append({
            "type": "port_congestion",
            "port": port_name,
            "vessels_waiting": vessels_waiting,
            "severity": "high" if vessels_waiting > 100 else "medium",
            "timestamp": datetime.utcnow().isoformat()
        })
    return congestion_data
Enter fullscreen mode Exit fullscreen mode

News-Based Disruption Detection

DISRUPTION_KEYWORDS = [
    r"factory\s+(shutdown|closure|fire|explosion)",
    r"port\s+(closure|strike|congestion|blockage)",
    r"supply\s+(shortage|disruption|crisis|crunch)",
    r"shipping\s+(delay|container\s+shortage|backlog)",
    r"(earthquake|tsunami|hurricane|typhoon|flood)",
    r"trade\s+(ban|restriction|sanction|embargo)",
]

def scan_news(self, news_url):
    soup = self.scrape(news_url)
    articles = soup.find_all("article")
    disruptions = []
    for article in articles[:20]:
        title = article.find(["h1", "h2", "h3"])
        if not title:
            continue
        text = article.get_text().lower()
        score = sum(1 for kw in DISRUPTION_KEYWORDS if re.search(kw, text))
        if score >= 2:
            disruptions.append({
                "title": title.get_text(strip=True),
                "relevance_score": score,
            })
    return sorted(disruptions, key=lambda x: x["relevance_score"], reverse=True)
Enter fullscreen mode Exit fullscreen mode

Alert Aggregation and Risk Scoring

def generate_risk_report(self):
    severity_scores = {"high": 3, "medium": 2, "low": 1}
    total_risk = sum(severity_scores.get(a["severity"], 0) for a in self.alerts)
    return {
        "generated_at": datetime.utcnow().isoformat(),
        "total_alerts": len(self.alerts),
        "risk_score": total_risk,
        "risk_level": (
            "critical" if total_risk > 15
            else "elevated" if total_risk > 8
            else "normal"
        ),
        "alerts": self.alerts,
    }

monitor = SupplyChainMonitor(API_KEY)
monitor.check_port_congestion("Shanghai")
monitor.check_port_congestion("Los-Angeles")
report = monitor.generate_risk_report()
print(json.dumps(report, indent=2))
Enter fullscreen mode Exit fullscreen mode

Scheduling Automated Checks

import schedule

def daily_supply_chain_check():
    monitor = SupplyChainMonitor(API_KEY)
    for port in ["Shanghai", "Rotterdam", "Los-Angeles", "Singapore"]:
        monitor.check_port_congestion(port)
    report = monitor.generate_risk_report()
    if report["risk_level"] in ("critical", "elevated"):
        send_alert_email(report)

schedule.every(6).hours.do(daily_supply_chain_check)
Enter fullscreen mode Exit fullscreen mode

For scraping logistics sites across regions, ThorData provides geo-targeted residential proxies. ScrapeOps monitors scraper health across all data sources.


Supply chain intelligence is a competitive advantage. Automated monitoring catches disruptions hours or days before mainstream news — extend with ML-based anomaly detection for even earlier warnings.

Happy scraping!

Top comments (0)