Extracting structured data from unstructured text is one of the most practical uses of language models in production. Advisory feeds, incident reports, job postings, legal documents — they all contain structured information buried in natural language. Getting that information out reliably requires more than a naive "respond in JSON" instruction.
This tutorial walks through the full stack: system prompt design, few-shot examples, chain-of-thought for ambiguous fields, JSON mode, and Pydantic validation with retry logic. The running example is CVE advisory extraction, which is genuinely hard because advisories vary wildly in format and verbosity.
What we are extracting
Given raw advisory text like this:
CERT-FR CERTFR-2025-AVI-0312
A critical vulnerability has been identified in Fortinet FortiGate versions
7.0.0 through 7.0.14. An unauthenticated remote attacker can exploit a
stack-based buffer overflow in the SSL-VPN web management interface to
execute arbitrary code. The CVSS v3.1 base score is 9.8 (Critical).
Affected versions: FortiOS 7.0.x < 7.0.15, 7.2.x < 7.2.9.
Apply the vendor patch immediately or disable SSL-VPN if patching is not
immediately possible.
We want this:
{
"cve_id": null,
"cert_id": "CERTFR-2025-AVI-0312",
"cvss_score": 9.8,
"severity": "Critical",
"affected_product": "Fortinet FortiGate / FortiOS",
"affected_versions": ["7.0.0–7.0.14", "7.2.x < 7.2.9"],
"vulnerability_type": "Stack-based buffer overflow",
"attack_vector": "Network (unauthenticated remote)",
"remediation": "Apply vendor patch; disable SSL-VPN if patch cannot be applied immediately"
}
Setup
pip install openai pydantic tenacity python-dotenv
import os
import json
import re
from typing import Optional
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator
from tenacity import retry, stop_after_attempt, wait_exponential
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
Step 1: Define the schema with Pydantic
Defining the output schema first forces you to be precise about what you actually need before you write a single prompt word.
class CVEExtraction(BaseModel):
cve_id: Optional[str] = Field(
None,
description="CVE identifier if present, e.g. CVE-2025-12345"
)
cert_id: Optional[str] = Field(
None,
description="CERT advisory ID if present"
)
cvss_score: Optional[float] = Field(
None,
ge=0.0,
le=10.0,
description="CVSS v3.x base score as a float"
)
severity: Optional[str] = Field(
None,
description="Severity label: Critical, High, Medium, Low, or Informational"
)
affected_product: str = Field(
description="Vendor and product name"
)
affected_versions: list[str] = Field(
default_factory=list,
description="List of affected version ranges"
)
vulnerability_type: str = Field(
description="Technical class of the vulnerability"
)
attack_vector: str = Field(
description="How the vulnerability is exploited (network/local/physical, auth required)"
)
remediation: str = Field(
description="Recommended fix or workaround"
)
@field_validator("cve_id")
@classmethod
def validate_cve_format(cls, v):
if v is not None and not re.match(r"CVE-\d{4}-\d{4,}", v):
raise ValueError(f"Invalid CVE format: {v}")
return v
@field_validator("severity")
@classmethod
def validate_severity(cls, v):
valid = {"Critical", "High", "Medium", "Low", "Informational", None}
if v not in valid:
raise ValueError(f"Invalid severity: {v}")
return v
Step 2: System prompt design
The system prompt does the heavy lifting. Three principles:
- Give the model a role and a goal, not just instructions
- Specify output format explicitly — schema + example
- Handle ambiguity with explicit rules — what to do when a field is missing
SYSTEM_PROMPT = """You are a cybersecurity data extraction engine. Your task is to parse
security advisories and extract structured vulnerability information.
OUTPUT FORMAT: Return a single JSON object conforming exactly to this schema:
{
"cve_id": string or null,
"cert_id": string or null,
"cvss_score": number (0.0–10.0) or null,
"severity": "Critical" | "High" | "Medium" | "Low" | "Informational" | null,
"affected_product": string (required),
"affected_versions": array of strings,
"vulnerability_type": string (required),
"attack_vector": string (required),
"remediation": string (required)
}
RULES:
- Return null for any field not present in the source text. Do NOT infer or guess.
- cvss_score must be numeric, not a string like "9.8 (Critical)".
- If the advisory lists multiple affected version ranges, list each as a separate array element.
- remediation should be a concise actionable summary, not a copy-paste of the full advisory.
- Return ONLY the JSON object. No explanation, no markdown fences, no commentary.
"""
Step 3: Few-shot examples
Few-shot examples dramatically improve consistency on edge cases. Include at least two: one clean advisory and one messy one where fields are implicit.
FEW_SHOT_EXAMPLES = [
{
"role": "user",
"content": """Extract from this advisory:
CVE-2024-21762 affects FortiOS and FortiProxy. An out-of-bounds write
vulnerability (CWE-787) in SSL-VPN allows a remote unauthenticated attacker
to execute arbitrary code or commands via specially crafted HTTP requests.
CVSS: 9.6. Affected: FortiOS 6.0 all versions, 6.2 all versions, 6.4.0–6.4.14,
7.0.0–7.0.14, 7.2.0–7.2.7, 7.4.0–7.4.2. Workaround: disable SSL-VPN."""
},
{
"role": "assistant",
"content": json.dumps({
"cve_id": "CVE-2024-21762",
"cert_id": None,
"cvss_score": 9.6,
"severity": "Critical",
"affected_product": "Fortinet FortiOS and FortiProxy",
"affected_versions": [
"FortiOS 6.0 all versions",
"FortiOS 6.2 all versions",
"FortiOS 6.4.0–6.4.14",
"FortiOS 7.0.0–7.0.14",
"FortiOS 7.2.0–7.2.7",
"FortiOS 7.4.0–7.4.2"
],
"vulnerability_type": "Out-of-bounds write (CWE-787)",
"attack_vector": "Network (remote unauthenticated)",
"remediation": "Disable SSL-VPN as workaround; apply vendor patch when available"
}, indent=2)
},
{
"role": "user",
"content": """Extract from this advisory:
A researcher has disclosed a high-severity issue in Apache Struts. The flaw
allows attackers on the local network with valid credentials to escalate
privileges through a race condition in the file upload handler. No CVE assigned
yet. Vendor fix expected next quarter. Score: 7.5."""
},
{
"role": "assistant",
"content": json.dumps({
"cve_id": None,
"cert_id": None,
"cvss_score": 7.5,
"severity": "High",
"affected_product": "Apache Struts",
"affected_versions": [],
"vulnerability_type": "Race condition / privilege escalation",
"attack_vector": "Local network (authenticated)",
"remediation": "No patch available; monitor vendor advisory for fix"
}, indent=2)
}
]
Step 4: Chain-of-thought for complex cases
Some advisories require reasoning to extract correctly — for example, when the CVSS score is described only in a table or when the affected product must be inferred from context. Add a CoT step before the final extraction on inputs flagged as complex.
def is_complex(text: str) -> bool:
"""Heuristic: long text or multiple products suggests complexity."""
return len(text) > 800 or text.count("CVE-") > 1
def add_cot_prefix(text: str) -> str:
return (
"First, identify the key fields step by step:\n"
"1. Is there a CVE ID? If not, is there another advisory ID?\n"
"2. What is the CVSS score? What severity does it correspond to?\n"
"3. Which product and versions are affected?\n"
"4. What is the technical class of the vulnerability?\n"
"5. What does the attacker need (network access? credentials?)?\n"
"6. What is the fix or workaround?\n\n"
"Now extract the JSON:\n\n"
+ text
)
Step 5: Extraction with retry and validation
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def extract_cve(advisory_text: str) -> CVEExtraction:
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
*FEW_SHOT_EXAMPLES,
]
user_content = advisory_text
if is_complex(advisory_text):
user_content = add_cot_prefix(advisory_text)
messages.append({"role": "user", "content": f"Extract from this advisory:\n{user_content}"})
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
response_format={"type": "json_object"}, # JSON mode — forces valid JSON output
temperature=0, # deterministic extraction
)
raw = response.choices[0].message.content
data = json.loads(raw)
return CVEExtraction(**data) # Pydantic validates types and constraints
The response_format={"type": "json_object"} parameter guarantees syntactically valid JSON. Pydantic then catches semantic errors (wrong types, out-of-range scores, malformed CVE IDs) and the tenacity decorator retries on any exception — network errors, validation failures, or parse errors alike.
Step 6: Batch processing
from pathlib import Path
def process_advisory_file(path: Path) -> dict:
text = path.read_text(encoding="utf-8")
try:
result = extract_cve(text)
return {"status": "ok", "file": path.name, "data": result.model_dump()}
except Exception as e:
return {"status": "error", "file": path.name, "error": str(e)}
def batch_extract(directory: Path) -> list[dict]:
results = []
for f in directory.glob("*.txt"):
print(f"Processing {f.name}...")
results.append(process_advisory_file(f))
return results
Measuring extraction quality
Define a small labelled test set and track precision per field:
def evaluate(predictions: list[CVEExtraction], ground_truth: list[dict]) -> dict:
fields = ["cve_id", "cvss_score", "severity", "affected_product", "vulnerability_type"]
scores = {f: [] for f in fields}
for pred, gt in zip(predictions, ground_truth):
for field in fields:
pred_val = getattr(pred, field)
gt_val = gt.get(field)
scores[field].append(1 if str(pred_val) == str(gt_val) else 0)
return {f: sum(v) / len(v) for f, v in scores.items() if v}
What actually matters
A few lessons from running this in production on real advisory feeds:
- Temperature = 0 is mandatory for extraction. Any randomness introduces inconsistency across identical inputs.
- JSON mode alone is not enough — it prevents syntax errors but not semantic garbage. Always validate with Pydantic.
- Few-shot examples outperform longer instructions for format compliance. Two well-chosen examples beat two paragraphs of "you must always..." text.
- CoT is a latency trade-off — it adds tokens but reduces errors on complex multi-product advisories. Gate it with a heuristic.
Security teams processing vulnerability feeds at scale — like those using the advisory tracking systems described at AYI NEDJIMI Consultants — typically combine this extraction layer with a downstream enrichment step that cross-references extracted CVEs against asset inventories.
What to build next
- Add a confidence score by asking the model to rate its own extraction (0–1 per field)
- Stream results with
stream=Truefor real-time processing of long advisories - Feed extracted data directly to a vector database for similarity search across historical advisories
- Set up automatic reprocessing when Pydantic validation fails after all retries
Top comments (0)