This article covers the first layer of the full-stack architecture: the Ingestion Pipeline. If this layer fails, the other five layers fail with it. The core engineering challenge: how do you standardize ingestion of multi-source heterogeneous documents (PDF tables / structured rules / HTML) without losing semantic structure?
📦 Source code: production-rag-engineering —
esg/services/loading_service.py,parsing_service.py
0. The Pain Point
Before the system went live, the compliance team's workflow looked like this:
- Download corporate ESG reports (PDF, averaging 200–300 pages)
- Open the GRI standards document, cross-check rule by rule
- Manually mark each item: "Met / Partially Met / Not Met"
- Consolidate into Excel, submit for audit
One report: 3–5 days. Miss rate: 15%. GRI standards update annually — syncing takes 2 weeks.
This isn't a headcount problem. It's a scalability bottleneck. In any document-intensive system, once document volume scales up, manual workflows break down at the same three points: slow ingestion, lagging updates, and error rates that grow linearly with volume.
The solution isn't hiring more people — it's building a standardized ingestion pipeline.
1. The Five Stages of the Ingestion Pipeline
The full ingestion flow is divided into five stages, each with a clear boundary of responsibility:
Raw Documents (PDF / HTML / Structured Data)
↓
[Step 1] Loading — Read documents, preserve structure
↓
[Step 2] Parsing — Identify headings, extract clauses
↓
[Step 3] Chunking — Split by semantic boundaries, attach metadata
↓
[Step 4] Dual Storage — Write to both PostgreSQL + Milvus
↓
[Step 5] Retrieval API — Expose a retrieval interface to downstream consumers
Which step causes the most damage when it fails?
Step 2 (Parsing) and Step 3 (Chunking).
The reason is straightforward: if parsing is wrong, everything stored downstream is wrong. If chunking is wrong, retrieval will never surface complete semantic units. Errors in these two steps amplify across the entire system — and they're hard to catch downstream, because vector similarity scores look normal even when the retrieved content is incomplete.
Step 1 (Loading) failures are actually the easiest to detect: page counts don't match, content is visibly missing — you can spot it with the naked eye.
2. Table Structure Preservation: The Hardest Engineering Problem in PDF Parsing
GRI standard PDFs contain extensive tables. A typical example:
| Clause ID | Disclosure Requirement | Data Type | Industry Variation |
|---|---|---|---|
| 302-1 | Energy consumption within the organization | Number + Text | Manufacturing: break down by production line |
| 305-1 | Direct greenhouse gas emissions | Number + Text | Must include calculation methodology |
| 306-3 | Significant spills | Number + Text | Must include spill volume |
Early on, we used PyPDF2. Tables came out as scrambled text. Clause IDs were separated from their disclosure requirements. Industry variation fields were lost. Parse accuracy: 68%.
Switching to pdfplumber delivered two core improvements:
① Preserving table structure
pdfplumber's extract_table() method identifies cell boundaries and reconstructs tables as 2D arrays, which can then be mapped to structured fields. The key parameter is table_settings, which needs to be tuned for GRI's specific table formatting:
import pdfplumber
def extract_gri_table(page):
table_settings = {
"vertical_strategy": "lines", # GRI tables have explicit vertical lines
"horizontal_strategy": "lines", # and explicit horizontal lines
"snap_tolerance": 3, # tolerate up to 3px line offset
}
tables = page.extract_tables(table_settings)
return tables
② Heuristic heading detection
GRI clause headings follow a fixed format — e.g., "306-1 Waste Generation", "302-1 Energy Consumption Within the Organization".
Detection rules:
def is_clause_title(line: str) -> bool:
# Rule 1: line length < 60 characters
# Rule 2: starts with a digit (clause number format)
# Rule 3: not a pure number line (excludes page numbers)
return (
len(line.strip()) < 60
and line.strip()[0].isdigit()
and not line.strip().replace("-", "").replace(" ", "").isdigit()
)
Heading detection accuracy improved from 82% to 97%.
Final results: table parse accuracy 68% → 99%, heading detection accuracy 82% → 97%.
Transferability note: This pdfplumber + heuristic rules combination applies to any PDF with a consistent table format — legal documents, financial reports, medical standards. You only need to adjust
table_settingsand the heading detection rules to match your target document's formatting.
3. Storage Selection: Why PostgreSQL + Milvus
Before settling on the dual-store architecture, we evaluated four options. We used elimination.
❌ Milvus only (vector store)
Test scenario: query for "all clauses under the 2021 Environmental category that require both numeric and textual disclosure."
The pure vector approach converts this query to a vector and runs similarity search. The problem:
- Version (2021), category (Environmental), disclosure type (numeric + text) are all structured fields — vector search can't filter them precisely
- Real-world accuracy: 70%, latency: 5s+
- Weak transactional guarantees — no rollback on write failure
❌ PostgreSQL only (relational store)
Test scenario: a report mentions "significant spills" — the system needs to automatically link this to clause 306-3.
The pure relational approach requires maintaining a synonym table mapping "spill," "overflow," "seepage," "liquid discharge," etc. to 306-3. The problem:
- Long-tail synonym expressions are endless — maintenance cost is unsustainable
- Full table scans on hundred-thousand-scale vectors are too slow — latency grows linearly
- Real-world accuracy: 79%, and it degrades as report language diversifies
❌ Neo4j (knowledge graph)
Head-to-head comparison:
| Metric | PostgreSQL | Neo4j |
|---|---|---|
| Development cost | 2 weeks | 3.2 weeks |
| Simple field query latency | < 10ms | 200ms |
| Accuracy | 94% | 95% |
| Maintenance cost | Low | High (requires graph DB expertise) |
Knowledge graphs have a real advantage in complex relationship reasoning (e.g., multi-tier supply chain compliance). But GRI clause relationships are primarily "field filtering + semantic matching" — mostly one-to-one or one-to-many. Introducing a graph store added 60% development cost, made queries 20x slower, and gained only 1% accuracy. The ROI wasn't there.
✅ Final selection: PostgreSQL + Milvus dual-store
Clear division of responsibility:
| Responsibility | PostgreSQL | Milvus |
|---|---|---|
| Structured field queries | ✅ SQL, < 50ms | ❌ |
| Semantic similarity retrieval | ❌ | ✅ Vector, 94% accuracy |
| Transactions and rollback | ✅ | ❌ (requires extra handling) |
| Version management and audit | ✅ | ❌ |
| Industry variation fields | ✅ JSONB | ❌ |
Why PostgreSQL over MySQL?
One critical difference: JSONB field performance.
GRI clauses include industry variation fields. For example, clause 302-1 (energy consumption): manufacturing companies must break down by production line; financial companies must break down by office area. This is semi-structured data — JSONB is the right fit.
Benchmark — same clause-industry mapping query:
- PostgreSQL JSONB + index: < 10ms
- MySQL JSON field (no index support, full table scan): > 50ms
5x performance difference.
4. Heterogeneous Data Modeling: The Three-Layer Design
The knowledge base data model has three layers, each solving a distinct engineering problem:
Category Layer
↓ Solves: fast filtering by version and category
Rule Layer
↓ Solves: precise lookup by clause ID and keyword
Instance Layer
↓ Solves: disclosure type matching and industry-specific adaptation
Category Layer (gri_standards)
CREATE TABLE gri_standards (
standard_id VARCHAR(20) PRIMARY KEY, -- e.g. "GRI_2021"
version VARCHAR(10), -- e.g. "2021"
category VARCHAR(50), -- e.g. "Environmental" / "Social" / "Governance"
effective_date DATE
);
Purpose: supports filtering by version and category. During historical audits, the system can precisely retrieve the rule set that was in effect at the time.
Rule Layer (gri_disclosures)
CREATE TABLE gri_disclosures (
disclosure_id VARCHAR(20) PRIMARY KEY, -- e.g. "302-1"
standard_id VARCHAR(20), -- FK to category layer
disclosure_name TEXT, -- e.g. "Energy consumption within the organization"
keywords TEXT[], -- e.g. ["energy", "consumption", "usage"]
page_number INT
);
Purpose: supports lookup by clause ID and keyword. The keyword array serves as the first filter in the dual-validation mechanism.
Instance Layer (gri_requirements)
CREATE TABLE gri_requirements (
requirement_id SERIAL PRIMARY KEY,
disclosure_id VARCHAR(20),
requirement_text TEXT,
data_type VARCHAR(20), -- e.g. "Number + Text" / "Text only"
industry_specific JSONB -- {"Manufacturing":"by production line","Finance":"by office area"}
);
Purpose: JSONB stores industry variations. At detection time, the system automatically applies the disclosure requirements for the company's specific industry — no duplicate development needed.
The core value of the three-layer model: each module only modifies its own layer without affecting the others. When GRI standards update, new clauses are added only to the Rule and Instance layers — the Category layer stays untouched. Industry variation changes only require updating the JSONB fields in the Instance layer.
5. Vectorized Write: Batch Rate-Limiting Design
Clause text is vectorized using text-embedding-3-large and written to Milvus.
Why 20 items per batch?
The OpenAI Embedding API has rate limits (TPM). Single-item calls waste quota; batches that are too large trigger rate limiting. Empirically, 20 items/batch is the sweet spot between throughput and stability:
import time
from openai import OpenAI
def batch_embed_and_store(clauses: list, batch_size: int = 20):
client = OpenAI()
for i in range(0, len(clauses), batch_size):
batch = clauses[i:i + batch_size]
texts = [c["requirement_text"] for c in batch]
# Exponential backoff: auto-retry on rate limit
for attempt in range(3):
try:
response = client.embeddings.create(
model="text-embedding-3-large",
input=texts
)
vectors = [item.embedding for item in response.data]
store_to_milvus(batch, vectors)
break
except RateLimitError:
wait_time = (2 ** attempt) * 1 # 1s, 2s, 4s
time.sleep(wait_time)
Collection naming convention: GRI_std_{model_name}_{timestamp} — e.g., GRI_std_openai_202510111800.
The timestamp in the name exists because GRI standards update annually. Old version vector collections must be retained for historical report detection — they cannot be overwritten.
6. Dual-Store Consistency: Three-Layer Guarantee
The biggest engineering risk in a dual-store architecture is data inconsistency — PostgreSQL write succeeds, Milvus write fails, and the two stores drift apart.
Three-layer protection mechanism:
① Transaction binding
PostgreSQL and Milvus writes are wrapped in the same logical transaction. If either fails, both roll back:
def dual_write(clause: dict, vector: list):
pg_conn = get_pg_connection()
try:
with pg_conn.transaction():
insert_to_postgres(pg_conn, clause)
success = insert_to_milvus(clause["disclosure_id"], vector)
if not success:
raise Exception("Milvus write failed")
except Exception as e:
pg_conn.rollback()
raise e
② Unified ID linkage
PostgreSQL's disclosure_id and Milvus's primary key are kept identical. The two stores are linked by ID — at any point, you can verify sync status by comparing IDs across both stores.
③ Scheduled consistency check
A validation script runs nightly, comparing clause counts and ID sets across both stores:
def daily_consistency_check():
pg_ids = set(get_all_ids_from_postgres())
milvus_ids = set(get_all_ids_from_milvus())
missing_in_milvus = pg_ids - milvus_ids
missing_in_postgres = milvus_ids - pg_ids
if missing_in_milvus or missing_in_postgres:
send_alert(f"Inconsistency detected: {len(missing_in_milvus)} records missing from Milvus")
Real incident this prevented:
When GRI added new "Climate-Related Financial Disclosures" clauses in 2023, a network timeout caused the Milvus batch write to fail mid-way. The transaction rollback mechanism triggered, and the corresponding PostgreSQL records were also deleted — both stores remained consistent.
Without this mechanism: PostgreSQL would have the new clauses, Milvus would have no corresponding vectors. Those clauses would silently never be retrieved. The system wouldn't throw an error — it would just quietly miss them. That's the worst kind of bug to debug.
7. Incremental Updates: Two Trigger Patterns
GRI standards update annually. Updates fall into two categories with completely different handling logic:
Lightweight update (clause description revision)
Trigger: minor wording adjustment to an existing clause; core elements unchanged.
Handling:
- Update the relevant fields in PostgreSQL directly
- Re-generate the vector for that clause, update Milvus incrementally
- Time cost: ~30 minutes
Major update (new clause added)
Trigger: entirely new clause added — e.g., the 2023 Climate-Related Financial Disclosures addition.
Handling:
- Insert new record in PostgreSQL (new
disclosure_id) - Insert new vector in Milvus (new primary key)
- Retain old version collection; name new version separately
- Time cost: ~2 hours (including manual verification)
Dual-version parallel mechanism:
When new clauses go live, reports currently under detection are unaffected. A report_id + standard_version_id mapping table binds each report to the GRI version that was active when it was submitted:
CREATE TABLE report_audit (
report_id VARCHAR(50),
standard_id VARCHAR(20), -- e.g. "GRI_2021"
submitted_at TIMESTAMP,
detected_at TIMESTAMP
);
Real case: when the 2023 climate clauses were added, 10 reports from 2022 were actively being processed. The new clauses only applied to reports submitted after the update. All 10 existing reports continued running against 2021 rules — 100% conclusion consistency.
This mechanism also solves the audit traceability problem. When regulators request a review, a single query returns: "Report ID=123, GRI Version=2021, Detection Time=2022-12-01" — fully reproducible.
8. Wrapping Up: The Ingestion Pipeline Decision Tree
When facing a new document ingestion scenario, three questions determine the approach:
Q1: Does the document contain tables or complex formatting?
├─ Yes → Use pdfplumber, configure table_settings. Don't use PyPDF2.
└─ No → Standard text parsing is sufficient.
Q2: Do you need structured field queries (filter by version / type / industry)?
├─ Yes → You need a relational store (PostgreSQL). A vector store alone won't cut it.
└─ No → A pure vector store is viable.
Q3: Will the documents be updated dynamically?
├─ Yes → Design an incremental update mechanism + version binding. Don't do full rebuilds.
└─ No → One-time ingestion is fine.
Transferability of this pipeline:
Every engineering decision here is general-purpose, even though it was validated against a GRI compliance scenario:
- Table parsing + heuristic rules → applicable to any PDF with tabular content
- Dual-store coordination → applicable to any knowledge base requiring "precise filtering + semantic matching"
- Incremental updates + version binding → applicable to any dynamically updated rule library
Legal documents, financial reports, medical standards — if the domain is document-intensive, rules are structured, and conclusions must be auditable, this pipeline transfers directly.
Source Code
All implementations referenced in this article are available here:
👉 github.com/muzinan123/production-rag-engineering
Relevant files for this part:
-
esg/services/loading_service.py— multi-tool PDF parsing router -
esg/services/parsing_service.py— 4 structuring strategies
Next up: Once ingestion is complete, the next problem is chunking. GRI clauses and ESG reports are two completely different document types — why do they require two completely different chunking strategies? And there's one document type that shouldn't be chunked at all. → Part 2 — Text Chunking
Top comments (0)