Staying on top of vulnerabilities is critical for security teams. Here's how to scrape and analyze NVD and CVE vulnerability data programmatically.
Why Scrape Vulnerability Databases?
Security teams need to track new CVEs affecting their stack, analyze trends, build custom dashboards, and correlate vulnerabilities with infrastructure.
Setup
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
SCRAPER_KEY = "YOUR_SCRAPERAPI_KEY"
Using the NVD API
def search_nvd(keyword, days_back=30):
start_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%dT00:00:00.000")
end_date = datetime.now().strftime("%Y-%m-%dT23:59:59.999")
url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
params = {
"keywordSearch": keyword,
"pubStartDate": start_date,
"pubEndDate": end_date,
"resultsPerPage": 100
}
resp = requests.get(url, params=params, timeout=30)
if resp.status_code != 200:
return pd.DataFrame()
data = resp.json()
vulnerabilities = []
for item in data.get("vulnerabilities", []):
cve = item.get("cve", {})
metrics = cve.get("metrics", {})
cvss_score = None
severity = None
cvss_v31 = metrics.get("cvssMetricV31", [])
if cvss_v31:
cvss_score = cvss_v31[0]["cvssData"]["baseScore"]
severity = cvss_v31[0]["cvssData"]["baseSeverity"]
descriptions = cve.get("descriptions", [])
desc = next((d["value"] for d in descriptions if d["lang"] == "en"), "")
vulnerabilities.append({
"cve_id": cve.get("id"),
"published": cve.get("published", "")[:10],
"description": desc[:300],
"cvss_score": cvss_score,
"severity": severity,
})
return pd.DataFrame(vulnerabilities)
Risk Prioritization
def calculate_risk_score(vuln_df, tech_stack):
def relevance_score(description, stack):
desc_lower = description.lower()
matches = sum(1 for tech in stack if tech.lower() in desc_lower)
return min(matches / max(len(stack), 1), 1.0)
vuln_df["relevance"] = vuln_df["description"].apply(
lambda d: relevance_score(d, tech_stack)
)
vuln_df["risk_score"] = (
(vuln_df["cvss_score"].fillna(0) / 10) * 0.5 +
vuln_df["relevance"] * 0.5
) * 100
return vuln_df.sort_values("risk_score", ascending=False)
tech_stack = ["nginx", "postgresql", "python", "django", "redis", "docker"]
vulns = search_nvd("web server", days_back=7)
vulns = calculate_risk_score(vulns, tech_stack)
critical = vulns[vulns["risk_score"] > 60]
for _, v in critical.iterrows():
print(f"[CRITICAL] {v['cve_id']} (CVSS: {v['cvss_score']}) - {v['description'][:80]}...")
Trend Analysis
def vulnerability_trends(keyword, months=6):
monthly_data = []
for i in range(months):
start = (datetime.now() - timedelta(days=30*(i+1))).strftime("%Y-%m-%dT00:00:00.000")
end = (datetime.now() - timedelta(days=30*i)).strftime("%Y-%m-%dT23:59:59.999")
resp = requests.get("https://services.nvd.nist.gov/rest/json/cves/2.0", params={
"keywordSearch": keyword,
"pubStartDate": start,
"pubEndDate": end,
"resultsPerPage": 1
})
if resp.status_code == 200:
total = resp.json().get("totalResults", 0)
monthly_data.append({"month": start[:7], "count": total})
time.sleep(6)
return pd.DataFrame(monthly_data)
Proxy Strategy
- ScraperAPI — Distribute requests across IPs for NVD rate limits
- ThorData — Residential proxies for Exploit-DB scraping
- ScrapeOps — Track API success rates and quota consumption
Conclusion
Automated vulnerability scraping transforms reactive security into proactive defense. Build the pipeline, schedule daily scans, and let the data prioritize your patching efforts.
Top comments (0)