DEV Community

agenthustler
agenthustler

Posted on

How to Monitor Open Source Vulnerability Disclosures

How to Monitor Open Source Vulnerability Disclosures

When a critical vulnerability drops in a popular open source package, response time matters. Log4Shell, Heartbleed, and the xz backdoor all demonstrated that hours of delay can mean the difference between patched and compromised. Let's build a Python monitor that tracks vulnerability disclosures across multiple sources in real time.

Why Build Your Own Monitor?

Commercial vulnerability scanners have lag. GitHub Advisory Database, NVD, and OSV all update at different speeds. By monitoring all sources simultaneously, you catch disclosures faster and can correlate data that no single source provides.

Architecture

Our monitor watches four sources:

  1. GitHub Advisory Database — fastest for open source packages
  2. NVD (National Vulnerability Database) — authoritative CVE data
  3. OSV (Open Source Vulnerabilities) — Google's aggregated feed
  4. Project-specific channels — mailing lists and security pages

GitHub Advisory Database Monitor

import requests
from datetime import datetime, timedelta

class GitHubAdvisoryMonitor:
    API_URL = "https://api.github.com/graphql"

    def __init__(self, token):
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }

    def get_recent_advisories(self, ecosystem="PIP", hours=24):
        since = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
        query = '''
        {
            securityAdvisories(
                first: 50,
                publishedSince: "%s",
                orderBy: {field: PUBLISHED_AT, direction: DESC}
            ) {
                nodes {
                    ghsaId
                    summary
                    severity
                    publishedAt
                    vulnerabilities(ecosystem: %s, first: 10) {
                        nodes {
                            package { name }
                            vulnerableVersionRange
                            firstPatchedVersion { identifier }
                        }
                    }
                }
            }
        }
        ''' % (since, ecosystem)
        response = requests.post(
            self.API_URL,
            json={"query": query},
            headers=self.headers
        )
        data = response.json()
        return data.get("data", {}).get("securityAdvisories", {}).get("nodes", [])
Enter fullscreen mode Exit fullscreen mode

NVD API Integration

class NVDMonitor:
    API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

    def get_recent_cves(self, hours=24, keyword=None):
        now = datetime.utcnow()
        start = now - timedelta(hours=hours)
        params = {
            "pubStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
            "pubEndDate": now.strftime("%Y-%m-%dT%H:%M:%S.000"),
            "resultsPerPage": 100
        }
        if keyword:
            params["keywordSearch"] = keyword
        response = requests.get(self.API_URL, params=params)
        data = response.json()
        cves = []
        for item in data.get("vulnerabilities", []):
            cve = item["cve"]
            metrics = cve.get("metrics", {})
            cvss_v31 = metrics.get("cvssMetricV31", [{}])
            score = cvss_v31[0].get("cvssData", {}).get("baseScore", 0) if cvss_v31 else 0
            cves.append({
                "id": cve["id"],
                "description": cve["descriptions"][0]["value"],
                "published": cve["published"],
                "cvss_score": score,
                "severity": "CRITICAL" if score >= 9.0 else "HIGH" if score >= 7.0 else "MEDIUM"
            })
        return cves
Enter fullscreen mode Exit fullscreen mode

OSV.dev Integration

class OSVMonitor:
    API_URL = "https://api.osv.dev/v1"

    def query_package(self, package_name, ecosystem="PyPI"):
        response = requests.post(
            f"{self.API_URL}/query",
            json={
                "package": {
                    "name": package_name,
                    "ecosystem": ecosystem
                }
            }
        )
        return response.json().get("vulns", [])

    def batch_query(self, packages):
        queries = [
            {"package": {"name": p["name"], "ecosystem": p.get("ecosystem", "PyPI")}}
            for p in packages
        ]
        response = requests.post(
            f"{self.API_URL}/querybatch",
            json={"queries": queries}
        )
        return response.json().get("results", [])
Enter fullscreen mode Exit fullscreen mode

Alert System

class VulnerabilityAlertSystem:
    def __init__(self, github_token, smtp_config):
        self.github = GitHubAdvisoryMonitor(github_token)
        self.nvd = NVDMonitor()
        self.osv = OSVMonitor()

    def scan_all_sources(self, watched_packages, hours=6):
        alerts = []
        for advisory in self.github.get_recent_advisories(hours=hours):
            if advisory["severity"] in ("CRITICAL", "HIGH"):
                alerts.append({
                    "source": "GitHub",
                    "id": advisory["ghsaId"],
                    "severity": advisory["severity"],
                    "summary": advisory["summary"]
                })
        for pkg in watched_packages:
            for cve in self.nvd.get_recent_cves(hours=hours, keyword=pkg):
                if cve["cvss_score"] >= 7.0:
                    alerts.append({
                        "source": "NVD", "id": cve["id"],
                        "severity": cve["severity"],
                        "summary": cve["description"][:200]
                    })
        osv_results = self.osv.batch_query(
            [{"name": p} for p in watched_packages]
        )
        for i, result in enumerate(osv_results):
            for vuln in result.get("vulns", []):
                alerts.append({
                    "source": "OSV", "id": vuln["id"],
                    "severity": vuln.get("database_specific", {}).get("severity", "UNKNOWN"),
                    "summary": vuln.get("summary", "No summary")
                })
        return self.deduplicate(alerts)

    def deduplicate(self, alerts):
        seen = set()
        unique = []
        for alert in alerts:
            key = alert["summary"][:100].lower()
            if key not in seen:
                seen.add(key)
                unique.append(alert)
        return unique
Enter fullscreen mode Exit fullscreen mode

Scraping Security Mailing Lists

For projects that announce vulnerabilities on mailing lists, use ScraperAPI for reliable fetching, ThorData proxies for high-frequency monitoring, and ScrapeOps for tracking scraper health.

Running the Monitor

if __name__ == "__main__":
    watched = ["django", "flask", "requests", "numpy", "pytorch", "tensorflow"]
    system = VulnerabilityAlertSystem(github_token="ghp_YOUR_TOKEN", smtp_config={})
    alerts = system.scan_all_sources(watched, hours=6)
    for alert in alerts:
        print(f"[{alert['severity']}] {alert['source']}: {alert['id']}")
        print(f"  {alert['summary']}")
Enter fullscreen mode Exit fullscreen mode

Deploy this as a cron job or serverless function. When a zero-day drops, you'll know within hours instead of days.

Top comments (0)