How to Monitor Open Source Vulnerability Disclosures
When a critical vulnerability drops in a popular open source package, response time matters. Log4Shell, Heartbleed, and the xz backdoor all demonstrated that hours of delay can mean the difference between patched and compromised. Let's build a Python monitor that tracks vulnerability disclosures across multiple sources in real time.
Why Build Your Own Monitor?
Commercial vulnerability scanners have lag. GitHub Advisory Database, NVD, and OSV all update at different speeds. By monitoring all sources simultaneously, you catch disclosures faster and can correlate data that no single source provides.
Architecture
Our monitor watches four sources:
- GitHub Advisory Database — fastest for open source packages
- NVD (National Vulnerability Database) — authoritative CVE data
- OSV (Open Source Vulnerabilities) — Google's aggregated feed
- Project-specific channels — mailing lists and security pages
GitHub Advisory Database Monitor
import requests
from datetime import datetime, timedelta
class GitHubAdvisoryMonitor:
API_URL = "https://api.github.com/graphql"
def __init__(self, token):
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def get_recent_advisories(self, ecosystem="PIP", hours=24):
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + "Z"
query = '''
{
securityAdvisories(
first: 50,
publishedSince: "%s",
orderBy: {field: PUBLISHED_AT, direction: DESC}
) {
nodes {
ghsaId
summary
severity
publishedAt
vulnerabilities(ecosystem: %s, first: 10) {
nodes {
package { name }
vulnerableVersionRange
firstPatchedVersion { identifier }
}
}
}
}
}
''' % (since, ecosystem)
response = requests.post(
self.API_URL,
json={"query": query},
headers=self.headers
)
data = response.json()
return data.get("data", {}).get("securityAdvisories", {}).get("nodes", [])
NVD API Integration
class NVDMonitor:
API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def get_recent_cves(self, hours=24, keyword=None):
now = datetime.utcnow()
start = now - timedelta(hours=hours)
params = {
"pubStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
"pubEndDate": now.strftime("%Y-%m-%dT%H:%M:%S.000"),
"resultsPerPage": 100
}
if keyword:
params["keywordSearch"] = keyword
response = requests.get(self.API_URL, params=params)
data = response.json()
cves = []
for item in data.get("vulnerabilities", []):
cve = item["cve"]
metrics = cve.get("metrics", {})
cvss_v31 = metrics.get("cvssMetricV31", [{}])
score = cvss_v31[0].get("cvssData", {}).get("baseScore", 0) if cvss_v31 else 0
cves.append({
"id": cve["id"],
"description": cve["descriptions"][0]["value"],
"published": cve["published"],
"cvss_score": score,
"severity": "CRITICAL" if score >= 9.0 else "HIGH" if score >= 7.0 else "MEDIUM"
})
return cves
OSV.dev Integration
class OSVMonitor:
API_URL = "https://api.osv.dev/v1"
def query_package(self, package_name, ecosystem="PyPI"):
response = requests.post(
f"{self.API_URL}/query",
json={
"package": {
"name": package_name,
"ecosystem": ecosystem
}
}
)
return response.json().get("vulns", [])
def batch_query(self, packages):
queries = [
{"package": {"name": p["name"], "ecosystem": p.get("ecosystem", "PyPI")}}
for p in packages
]
response = requests.post(
f"{self.API_URL}/querybatch",
json={"queries": queries}
)
return response.json().get("results", [])
Alert System
class VulnerabilityAlertSystem:
def __init__(self, github_token, smtp_config):
self.github = GitHubAdvisoryMonitor(github_token)
self.nvd = NVDMonitor()
self.osv = OSVMonitor()
def scan_all_sources(self, watched_packages, hours=6):
alerts = []
for advisory in self.github.get_recent_advisories(hours=hours):
if advisory["severity"] in ("CRITICAL", "HIGH"):
alerts.append({
"source": "GitHub",
"id": advisory["ghsaId"],
"severity": advisory["severity"],
"summary": advisory["summary"]
})
for pkg in watched_packages:
for cve in self.nvd.get_recent_cves(hours=hours, keyword=pkg):
if cve["cvss_score"] >= 7.0:
alerts.append({
"source": "NVD", "id": cve["id"],
"severity": cve["severity"],
"summary": cve["description"][:200]
})
osv_results = self.osv.batch_query(
[{"name": p} for p in watched_packages]
)
for i, result in enumerate(osv_results):
for vuln in result.get("vulns", []):
alerts.append({
"source": "OSV", "id": vuln["id"],
"severity": vuln.get("database_specific", {}).get("severity", "UNKNOWN"),
"summary": vuln.get("summary", "No summary")
})
return self.deduplicate(alerts)
def deduplicate(self, alerts):
seen = set()
unique = []
for alert in alerts:
key = alert["summary"][:100].lower()
if key not in seen:
seen.add(key)
unique.append(alert)
return unique
Scraping Security Mailing Lists
For projects that announce vulnerabilities on mailing lists, use ScraperAPI for reliable fetching, ThorData proxies for high-frequency monitoring, and ScrapeOps for tracking scraper health.
Running the Monitor
if __name__ == "__main__":
watched = ["django", "flask", "requests", "numpy", "pytorch", "tensorflow"]
system = VulnerabilityAlertSystem(github_token="ghp_YOUR_TOKEN", smtp_config={})
alerts = system.scan_all_sources(watched, hours=6)
for alert in alerts:
print(f"[{alert['severity']}] {alert['source']}: {alert['id']}")
print(f" {alert['summary']}")
Deploy this as a cron job or serverless function. When a zero-day drops, you'll know within hours instead of days.
Top comments (0)