import requests
import time
from database import get_db_connection
import json
NVD API URL
BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def fetch_and_store_cves(total_to_fetch=100):
# NOTE: Fetching ALL NVD data takes hours. For the test, we fetch a subset (e.g., 2000 records).
# Remove 'total_to_fetch' limit for full production sync.
conn = get_db_connection()
cursor = conn.cursor() #Creates a "cursor" object. This is used to execute SQL commands (like INSERT) against the database connection.
start_index = 0
results_per_page = 2000 # Max allowed by NVD is usually 2000
print("Starting synchronization...")
while start_index < total_to_fetch:
params = {
'startIndex': start_index,
'resultsPerPage': results_per_page
}
try:
print(f"Fetching batch starting at {start_index}...")
response = requests.get(BASE_URL, params=params, timeout=30)
if response.status_code != 200:
print(f"Error: API returned {response.status_code}")
break
data = response.json()
vulnerabilities = data.get('vulnerabilities', [])
if not vulnerabilities:
break # No more data
for item in vulnerabilities:
cve = item['cve']
cve_id = cve['id']
source_id = cve.get('sourceIdentifier', 'N/A')
published = cve.get('published', '')
last_modified = cve.get('lastModified', '')
status = cve.get('vulnStatus', '')
# Extract Score (Try V3 first, then V2)
score = None
metrics = cve.get('metrics', {})
if 'cvssMetricV31' in metrics:
score = metrics['cvssMetricV31'][0]['cvssData']['baseScore']
elif 'cvssMetricV2' in metrics:
score = metrics['cvssMetricV2'][0]['cvssData']['baseScore']
# Insert or Replace (Deduplication)
cursor.execute('''
INSERT OR REPLACE INTO cves
(id, sourceIdentifier, published, lastModified, vulnStatus, baseScore, details)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (cve_id, source_id, published, last_modified, status, score, json.dumps(cve)))
conn.commit()
start_index += results_per_page
# Sleep to avoid rate limiting (NVD is strict)
time.sleep(2)
except Exception as e:
print(f"Exception occurred: {e}")
break
conn.close()
print("Synchronization Complete.")
if name == "main":
# Fetch 2000 records for the assessment demo
fetch_and_store_cves(total_to_fetch=2000)
Top comments (0)