DEV Community

Gug
Gug

Posted on

Fetching and Storing CVE Data from NVD API using Python

import requests
import time
from database import get_db_connection
import json

NVD API URL

BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

def fetch_and_store_cves(total_to_fetch=100):
# NOTE: Fetching ALL NVD data takes hours. For the test, we fetch a subset (e.g., 2000 records).
# Remove 'total_to_fetch' limit for full production sync.

conn = get_db_connection()
cursor = conn.cursor()   #Creates a "cursor" object. This is used to execute SQL commands (like INSERT) against the database connection.

start_index = 0
results_per_page = 2000  # Max allowed by NVD is usually 2000

print("Starting synchronization...")

while start_index < total_to_fetch:
    params = {
        'startIndex': start_index,
        'resultsPerPage': results_per_page
    }

    try:
        print(f"Fetching batch starting at {start_index}...")
        response = requests.get(BASE_URL, params=params, timeout=30)

        if response.status_code != 200:
            print(f"Error: API returned {response.status_code}")
            break

        data = response.json()
        vulnerabilities = data.get('vulnerabilities', [])

        if not vulnerabilities:
            break # No more data

        for item in vulnerabilities:
            cve = item['cve']
            cve_id = cve['id']
            source_id = cve.get('sourceIdentifier', 'N/A')
            published = cve.get('published', '')
            last_modified = cve.get('lastModified', '')
            status = cve.get('vulnStatus', '')

            # Extract Score (Try V3 first, then V2)
            score = None
            metrics = cve.get('metrics', {})
            if 'cvssMetricV31' in metrics:
                score = metrics['cvssMetricV31'][0]['cvssData']['baseScore']
            elif 'cvssMetricV2' in metrics:
                score = metrics['cvssMetricV2'][0]['cvssData']['baseScore']

            # Insert or Replace (Deduplication)
            cursor.execute('''
                INSERT OR REPLACE INTO cves 
                (id, sourceIdentifier, published, lastModified, vulnStatus, baseScore, details)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (cve_id, source_id, published, last_modified, status, score, json.dumps(cve)))

        conn.commit()
        start_index += results_per_page

        # Sleep to avoid rate limiting (NVD is strict)
        time.sleep(2)

    except Exception as e:
        print(f"Exception occurred: {e}")
        break

conn.close()
print("Synchronization Complete.")
Enter fullscreen mode Exit fullscreen mode

if name == "main":
# Fetch 2000 records for the assessment demo
fetch_and_store_cves(total_to_fetch=2000)

Top comments (0)