I knew something was wrong the first time I saw a “candidate” come back with the recruiter’s phone number.
Nothing was broken in the obvious places. Extraction ran. Persistence succeeded. The UI showed a clean-looking result.
But the identity was wrong.
That moment is what this series is really about.
This is Part 1 of How to Architect an Enterprise AI System (And Why the Engineer Still Matters). In Part 0—“The Day My AI Forgot Everything (So I Built a Context-Continuity Inference Stack)”—I argued the thesis: models raise the floor; architecture is still the ceiling. Here’s the first concrete decision that proved it in production:
I stopped designing my extraction pipeline for clean input—and started designing it for adversarial input.
Not adversarial like “attackers.” Adversarial like real email:
- forwarded threads with duplicated headers
- signature blocks with phone numbers that look more “extractable” than the actual subject’s
- HTML bodies full of invisible control characters and weird spacing
- scheduler reschedules that quietly change the meeting details while keeping the thread “about the same thing”
The punchline is unintuitive if you’ve only built demos: a small, boring, deterministic preprocessor matters more than the model call. If you feed the model a contaminated body, you don’t get “slightly worse extraction.” You get a perfectly formatted result that’s anchored to the wrong person.
Key insight (early, because it’s the whole game)
A naive extraction pipeline treats an email body like a document.
My production pipeline treats an email body like a crime scene.
You don’t start by asking the smartest witness in the room what happened. You start by bagging evidence, isolating the relevant portion, and keeping unrelated fingerprints off the sample.
In my case, that means the intake path has a hard pre-model front-end that does three things:
- Sanitizes the input (strip null bytes/control chars, normalize newlines, enforce size limits)
- Detects forwarded content across the mess of formats people actually send
- Handles the scheduler reschedule edge case so “current meeting info” is what downstream logic sees
Only after that do I let the extraction workflow touch the text.
The 7-step pipeline (and why it’s ordered this way)
The streamlined intake endpoint I built exists for the mail add-in container. It’s intentionally narrow: it validates and sanitizes, runs the extraction graph (with research tools where needed), persists the result into the system of record, then formats a response for the add-in.
The ordering is the point. The pipeline is front-loaded with the boring work because that’s where production breaks.
Here’s the data flow at the level that matters for this decision:
flowchart TD
addin[MailAddin] --> api[IntakeEmailRoute]
api --> sanitize[SanitizeAndValidate]
sanitize --> forwardDetect[ForwardDetectAndExtract]
forwardDetect --> reschedule[RescheduleDetect]
reschedule --> extract[LangGraphExtraction]
extract --> persist[RecordStorePersistence]
persist --> response[ResponseFormatting]```
The non-obvious part is that I’m not cleaning text for aesthetics. I’m shaping the input so the extractor sees the right identity boundary: **forwarder vs invitee**.
If you get that boundary wrong, everything downstream becomes expensive:
- the system of record now contains a real-looking but incorrect entity
- dedupe logic starts doing the wrong thing (because it trusts the wrong email/phone)
- follow-on automations fire (messages, reminders, tasks) against the wrong person
- human reviewers waste time doing forensic repair because the entry looks legitimate
So I made the boundary deterministic.
## How it works under the hood
### Sanitization: why it’s non-optional
I keep sanitization at the route boundary because it’s the only place I can guarantee every downstream consumer benefits.
Email is not “text.” Email is a transport format that often contains:
- null bytes (`\x00`) and other control characters
- odd Unicode separators
- HTML that is later converted to text with inconsistent whitespace
- copied content from PDFs or calendar clients with invisible formatting
If you don’t normalize early, you end up debugging regexes, parsers, and prompts that were never wrong—your bytes were.
Here’s a **minimal, runnable** version of the streamlined intake function that demonstrates the contract and ordering. It’s not tied to any web framework so you can run it as a script, but it mirrors how my route is structured: sanitize first, then detect forwarding/reschedules, then call extraction, then persist, then format.
```python
from __future__ import annotations
import json
import re
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
@dataclass
class EmailPayload:
subject: str
from_address: str
body: str
@dataclass
class ProcessingResult:
correlation_id: str
extracted: Dict[str, Any]
persisted_id: str
flags: Dict[str, Any]
class InputTooLarge(ValueError):
pass
def sanitize_email_body(body: str, *, max_chars: int = 120_000) -> Tuple[str, Dict[str, Any]]:
"""Sanitize email text for downstream deterministic parsing and model calls.
- Enforces a hard size limit (prevents pathological threads and payloads)
- Strips null bytes and most control characters
- Normalizes newlines
Returns:
(sanitized_body, metrics)
"""
if body is None:
body = ""
original_len = len(body)
if original_len > max_chars:
raise InputTooLarge(f"email body too large: {original_len} > {max_chars}")
# Normalize newlines first so subsequent parsing is consistent.
body = body.replace("\r\n", "\n").replace("\r", "\n")
# Remove null bytes explicitly.
body = body.replace("\x00", "")
# Remove remaining control characters except tab/newline.
# Keep \n and \t to preserve structure.
cleaned_chars = []
removed = 0
for ch in body:
code = ord(ch)
if ch in ("\n", "\t"):
cleaned_chars.append(ch)
elif 0 <= code < 32:
removed += 1
else:
cleaned_chars.append(ch)
sanitized = "".join(cleaned_chars)
metrics = {
"original_len": original_len,
"sanitized_len": len(sanitized),
"control_chars_removed": removed,
"null_bytes_removed": original_len - len(body) if "\x00" in body else 0,
}
return sanitized, metrics
FORWARD_MARKERS = [
# Common “forwarded message” separators across mail clients.
re.compile(r"^-{2,}\s*Forwarded message\s*-{2,}$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^Begin forwarded message:\s*$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^Fwd:\s+", re.IGNORECASE | re.MULTILINE),
]
def detect_forwarded(email_text: str) -> Optional[re.Pattern]:
for pat in FORWARD_MARKERS:
if pat.search(email_text):
return pat
return None
def extract_forwarded_block(email_text: str) -> str:
"""Return the portion of the email that most likely contains the forwarded content.
Strategy:
- If a forward marker exists, return the content from the first marker onward.
- Otherwise return original.
This is intentionally conservative: if we find a forward marker, we want to isolate
the forwarded payload so identity fields come from the forwarded message, not the forwarder.
"""
best_idx: Optional[int] = None
for pat in FORWARD_MARKERS:
m = pat.search(email_text)
if m:
idx = m.start()
if best_idx is None or idx < best_idx:
best_idx = idx
return email_text[best_idx:] if best_idx is not None else email_text
RESCHEDULE_SIGNAL = re.compile(r"\b(Former:|Updated:)\b", re.IGNORECASE)
def is_reschedule_notice(email_text: str) -> bool:
return bool(RESCHEDULE_SIGNAL.search(email_text))
def run_extraction_graph(email_text: str, subject: str) -> Dict[str, Any]:
"""Stub for the extraction graph.
In production this is a multi-step workflow (extract → research → validate).
Here we emulate the output shape used downstream.
"""
# Extremely small demo: pull first email address and first phone-looking token.
email_match = re.search(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", email_text, re.IGNORECASE)
phone_match = re.search(r"\+?\d[\d\s().-]{7,}\d", email_text)
return {
"subject": subject,
"candidate_email": email_match.group(0) if email_match else None,
"candidate_phone": phone_match.group(0) if phone_match else None,
}
def persist_record(extracted: Dict[str, Any], correlation_id: str) -> str:
"""Stub for persistence into the system of record."""
# In production this is a network call; we return a deterministic id for the demo.
payload = json.dumps(extracted, sort_keys=True)
return f"rec_{correlation_id[:8]}_{abs(hash(payload)) % 10_000}"
def format_response(result: ProcessingResult) -> Dict[str, Any]:
return {
"correlation_id": result.correlation_id,
"persisted_id": result.persisted_id,
"flags": result.flags,
"extracted": result.extracted,
}
def process_email_streamlined(payload: EmailPayload, *, force_reprocess: bool = False) -> Dict[str, Any]:
"""Streamlined email processing for a mail add-in container.
Core workflow:
1) Input validation and sanitization
2) Forward/reschedule detection and normalization
3) Extraction graph
4) Persistence
5) Response formatting
"""
correlation_id = str(uuid.uuid4())
sanitized_body, sanitize_metrics = sanitize_email_body(payload.body)
forwarded_marker = detect_forwarded(sanitized_body)
focused_text = extract_forwarded_block(sanitized_body)
reschedule = is_reschedule_notice(focused_text)
extracted = run_extraction_graph(focused_text, payload.subject)
persisted_id = persist_record(extracted, correlation_id)
result = ProcessingResult(
correlation_id=correlation_id,
extracted=extracted,
persisted_id=persisted_id,
flags={
"force_reprocess": force_reprocess,
"sanitization": sanitize_metrics,
"is_forwarded": forwarded_marker is not None,
"forward_marker": forwarded_marker.pattern if forwarded_marker else None,
"is_reschedule": reschedule,
},
)
return format_response(result)
if __name__ == "__main__":
sample = EmailPayload(
subject="Fwd: Scheduling",
from_address="recruiter@domain.invalid",
body=(
"Hi — forwarding this.\n\n"
"-- Forwarded message --\n"
"From: Scheduler <no-reply@domain.invalid>\n"
"Invitee Email: candidate@domain.invalid\n\n"
"Updated: Tue 3pm\nFormer: Mon 1pm\n\n"
"Recruiter Signature\n"
"+1 (212) 555-0100\n"
),
)
print(json.dumps(process_email_streamlined(sample), indent=2))
That script demonstrates the exact property I care about: the system is deterministic about what “the input” is before the extraction workflow sees it. The model (or graph) can still be wrong, but now it’s wrong on a stable, bounded, well-structured slice of text—not on a heap of transport artifacts.
Two practical notes from production:
- Size limits aren’t about saving tokens. They’re about preventing “thread bombs” (multi-month threads + embedded legal footers + inline images-as-text) from slowing every downstream stage. Hard limits give you predictable latency and predictable cost.
-
Newline normalization is a correctness issue. A lot of email formats use
\r\n, some use bare\r, and HTML-to-text conversion can produce odd sequences. If you don’t normalize, you get detection patterns that fail “randomly.”
Forwarded email detection: a production feature, not a nice-to-have
The human realization that changed everything: a huge share of production emails are forwarded.
Forwarding isn’t “more text.” It’s an identity inversion.
The top of the email is now the forwarder’s name, phone, and signature—exactly the stuff extractors love to grab—while the actual subject (the person you care about) is often deeper in the forwarded payload.
So I built forwarded-message detection as a first-class step with a battery of patterns that cover the common client formats we see. The goal is not perfection; the goal is to catch the high-frequency formats deterministically and route the body through a “forwarded block extractor” before we do anything probabilistic.
The most important architectural choice here is where it lives:
- It does not live inside a prompt as a “please ignore signatures” instruction.
- It does not live after extraction as a cleanup pass.
- It lives before extraction, as a gate that decides what text is even eligible to be considered the canonical payload.
Here’s a small, runnable harness that demonstrates forwarded detection with a real pattern and a positive match:
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class ForwardDetectionResult:
is_forwarded: bool
marker: Optional[str]
FORWARD_PATTERNS = [
re.compile(r"^-{2,}\s*Forwarded message\s*-{2,}$", re.IGNORECASE | re.MULTILINE),
re.compile(r"^Begin forwarded message:\s*$", re.IGNORECASE | re.MULTILINE),
]
def detect_forwarded(email_text: str) -> ForwardDetectionResult:
for pat in FORWARD_PATTERNS:
if pat.search(email_text):
return ForwardDetectionResult(True, pat.pattern)
return ForwardDetectionResult(False, None)
if __name__ == "__main__":
email_text = (
"Hi — forwarding this.\n\n"
"------ Forwarded message ------\n"
"From: Person <example@domain.invalid>\n"
"To: Recruiter <recruiter@domain.invalid>\n"
)
result = detect_forwarded(email_text)
print(result)
# Expected: ForwardDetectionResult(is_forwarded=True, marker='^-{2,}\\s*Forwarded message\\s*-{2,}$')
In production I also extract the forwarded block and pass only that (or that plus a small amount of local context) into the extraction workflow. This single decision prevented the most common failure pattern I saw early on: signature contamination.
A realistic contamination looks like this:
- forwarded thread begins with the forwarder’s “Hi, see below”
- then comes the forward marker
- then comes the forwarded content with the actual invitee’s email
- then the forwarder’s signature repeats at the bottom (often twice in long chains)
If you hand the entire body to an extractor, it has to solve an attribution problem (who is who) and an extraction problem (what are the fields) at the same time. Attribution is the harder problem, and it’s unnecessary work if you can reduce the ambiguity deterministically.
Reschedules are their own class of email, so I treat them like one
Reschedules are sneaky because they look like “the same invitation,” but the semantics change.
The content often contains both the old and new time, sometimes both meeting locations, sometimes both conferencing links, and the difference is signaled by a small token like Former: and Updated:. If you treat that as just more text, you can end up extracting a plausible meeting that never actually happens.
So I added a reschedule detector before extraction. That does two things:
- It lets downstream logic treat the email as a reschedule notice and apply different validation rules.
- It makes the extraction workflow’s job easier because it can be told “you are looking at an update; prefer updated fields.”
Here’s a runnable version of the detection:
import re
RESCHEDULE_RE = re.compile(r"\b(Former:|Updated:)\b", re.IGNORECASE)
def is_reschedule_notice(email_text: str) -> bool:
return bool(RESCHEDULE_RE.search(email_text))
if __name__ == "__main__":
original = "Meeting details below"
rescheduled = "Updated: Tue 3pm\nFormer: Mon 1pm"
print(is_reschedule_notice(original)) # False
print(is_reschedule_notice(rescheduled)) # True
The win is subtle but real: reschedule detection belongs before extraction, not after.
If you detect it late, you’ve already asked the extractor to reconcile contradictory fields into a single narrative. Detect it early and you can decide which sections are authoritative—or at minimum, annotate the run so validators know what kind of email they’re dealing with.
A concrete walkthrough: forwarded scheduler email and the identity boundary
Here’s the exact failure pattern that forced this design:
- A recruiter forwards a scheduler invite.
- The forwarded email contains a clean
Invitee Emailfield (the actual candidate). - The forwarder’s signature contains a phone number.
- The extractor sees the signature early (or late) and grabs the phone number.
- Now the “candidate” record contains the recruiter’s phone.
The fix is not “better prompting.” The fix is to treat provenance like a first-class signal.
In my extraction layer, scheduler-specific fields get priority over generic extraction, and the fallback path includes targeted recovery patterns (for example, recovering Invitee Email: from the body when a generic extraction produced something that is clearly from the notification system rather than the human subject).
Below is a complete, runnable Python example that illustrates the same precedence rules I run in production:
- prefer scheduler-provided invitee email when present
- otherwise use the generic extracted email
- filter out notification-system addresses
- recover
Invitee Email:from the body when needed - avoid accidentally “accepting” internal test/staff data
import re
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class Candidate:
email: Optional[str] = None
source: Optional[str] = None
def apply_candidate_email(candidate: Candidate, email: str, *, source: str) -> None:
candidate.email = email
candidate.source = source
def is_internal_test_data(value: str, field: str) -> bool:
"""Example guard to keep internal/test identities out of downstream records."""
v = value.lower()
if field == "email":
return v.endswith("@domain.invalid") and v.startswith("test+")
return False
def normalize_email(email: str) -> str:
return email.strip().strip("<>").lower()
INVITEE_EMAIL_RE = re.compile(
r"Invitee\s+Email:\s*\n?\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})",
re.IGNORECASE,
)
def choose_candidate_email(
*,
scheduler_fields: Dict[str, str],
extracted_fields: Dict[str, str],
email_content: str,
) -> Candidate:
candidate = Candidate()
# 1) Highest provenance: explicit invitee email from scheduler fields.
invitee_email = scheduler_fields.get("invitee_email")
if invitee_email:
apply_candidate_email(candidate, normalize_email(invitee_email), source="scheduler")
return candidate
# 2) Next: generic extraction result, but filtered.
generic_email = extracted_fields.get("email")
if generic_email:
e = normalize_email(generic_email)
# Filter out notification-system mailboxes and internal/test data.
if "no-reply@" in e or "noreply@" in e or "notifications@" in e or is_internal_test_data(e, "email"):
generic_email = None
else:
apply_candidate_email(candidate, e, source="generic")
return candidate
# 3) Recovery: search body for Invitee Email.
m = INVITEE_EMAIL_RE.search(email_content)
if m:
apply_candidate_email(candidate, normalize_email(m.group(1)), source="body_recovery")
return candidate
if __name__ == "__main__":
scheduler_fields = {"invitee_email": "candidate@domain.invalid"}
extracted_fields = {"email": "no-reply@domain.invalid"}
email_content = "Invitee Email:\n candidate@domain.invalid\n"
chosen = choose_candidate_email(
scheduler_fields=scheduler_fields,
extracted_fields=extracted_fields,
email_content=email_content,
)
print(chosen)
# Expected: Candidate(email='candidate@domain.invalid', source='scheduler')
That example is small, but it demonstrates the posture: prefer the field with the strongest provenance. In a system that writes records humans will trust, provenance is not a nice-to-have. It’s the difference between a correct entity graph and a polluted one.
Why the naive approach fails
If you skip these pre-model steps, you end up with an extractor that is “correct” on curated examples and brittle on real ones.
Forwarded emails are the perfect trap because they contain two plausible identities:
- the forwarder (often with a full signature block)
- the invitee/candidate (often embedded deeper)
A model can extract either one. That’s the problem. Without deterministic preprocessing, you’re not asking the model to “extract the candidate.” You’re asking it to “extract a person-shaped object from a person-shaped email.”
And if your pipeline writes into a system of record, the cost of being wrong isn’t just an incorrect answer—it’s a wrong record that looks legitimate.
This is where production engineering differs from prompt craft:
- Prompt craft tries to make the model pick the right identity.
- Production engineering reduces the number of identities the model can plausibly pick.
The tradeoff: deterministic gates can be wrong too
A contaminated extraction fails quietly: it produces a confident, internally consistent structure around the wrong entity.
A strict preprocessing step fails loudly: it flags “forwarded” or “reschedule” (or it doesn’t), and I can trace that decision with correlation IDs, metrics, and test cases.
That asymmetry is the entire argument for deterministic gates. The cost of maintaining forward-detection patterns and size limits is observable and bounded. The cost of a contaminated record that looks legitimate is neither.
The “engineer still matters” call
No model asked me to build a forwarded-content gate.
The model would have processed raw email text forever, because it can always produce an answer. The engineer’s job is to notice that the system is answering the wrong question.
The question isn’t “can you extract fields from this blob of text?”
The question is “can you preserve identity boundaries and provenance when the blob contains multiple plausible truths?”
That’s why I start with sanitization, forwarded detection, and reschedule detection—before I spend a single token on extraction.
In Part 2, I’ll zoom in on the next decision: why I prefer high variance plus downstream validation over low variance and brittle parsing, and how that shows up in the extract → research → validate shape of my LangGraph workflow.
The day I stopped trusting email bodies, the pipeline stopped producing confidently incorrect records—and that gave every downstream stage a clean substrate to build on.
🎧 Listen to the Enterprise AI Architecture audiobook
📖 Read the full 13-part series with an AI assistant
Top comments (0)