Docling v2.102 and new great features!
Introduction
In its latest release, Docling expands its robust parsing capabilities by introducing native support for EPUB and .eml formats.
By adding these to its already extensive list of compatible document types, the Docling engine makes data preparation for Generative AI applications smoother than ever. Whether you are chunking text for RAG pipelines, extracting clean metadata, or structuring unstructured files for LLM training, Docling effortlessly streamlines the entire document processing workflow and simplifies downstream generative AI workloads—such as token-aware text chunking, document summarization, and vector store preparation.
Before moving forward, a brief introduction on these two formats.
The EPUB Format
The EPUB (Electronic Publication) format was introduced in 2007 by the International Digital Publishing Forum (IDPF) as a successor to the older Open eBook standard, and it is currently maintained by the W3C. Designed as an open, XML-based standard for digital publications, its defining feature is reflowable content. This means the text automatically adjusts and optimizes its layout to fit different screen sizes, resolutions, and orientations — making it the universal standard for e-readers, smartphones, and tablets. Structurally, an .epub file is essentially a zipped archive containing HTML/XHTML files for the text, CSS for styling, images, and XML metadata for navigation and book structure.
The EML Format
The EML (Email) file format was developed by Microsoft to comply with the industry standard RFC 822 (and later RFC 5322) protocols for electronic mail. It was widely popularized by email clients like Outlook Express, Windows Live Mail, and Mozilla Thunderbird. An .emlfile is a plain text file that preserves the exact, raw architecture of a single email message. It contains the message header (including metadata like Sender, Recipient, Date, Subject, and routing servers) followed by the message body. The body can include plain text, HTML formatting, and binary data—such as file attachments or embedded images—which are encoded into text using MIME (Multipurpose Internet Mail Extensions) formatting.
Industrializing Document Ingestion: Processing EPUB, EML, and Privacy-First (PII) Pipelines with Docling
As usual, our automated software engineer, Bob, has designed and industrialized a production-ready file processing pipeline. This system bridges the gap between complex unstructured file systems and downstream vectorized AI platforms by orchestrating native format decoders alongside IBM’s open-source Docling engine.
Architectural Overview & Workflow Pipeline
The solution is split into two major layers: a high-throughput CLI processing engine (app/main.py) and an interactive Gradio validation UI (app/ui.py). The operational architecture maps a predictable, reliable path from raw user input down to analytical artifacts:
# main.py
from __future__ import annotations
import argparse
import csv
import json
import re
import xml.etree.ElementTree as ET
import zipfile
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
try:
import mailparser
except ModuleNotFoundError as exc:
raise SystemExit(
"Missing dependency 'mailparser'. Activate the project virtual environment and install dependencies with '\n"
"python3 -m venv .venv && ./.venv/bin/pip install --upgrade pip && ./.venv/bin/pip install -e .[dev]'\n"
"Then run either './.venv/bin/python app/main.py' or './.venv/bin/python -m app.main'."
) from exc
from docling.document_converter import DocumentConverter
SUPPORTED_EXTENSIONS = {".epub", ".eml"}
@dataclass(slots=True)
class ProcessedDocument:
source_path: str
source_name: str
source_extension: str
relative_path: str
docling_format: str
output_directory: str
markdown_path: str
json_path: str
text_path: str
structured_summary_path: str
chunks_path: str
attachment_dir: str | None
metadata: dict[str, Any]
chunk_count: int
text_length: int
markdown_length: int
@dataclass(slots=True)
class PipelineRunResult:
run_timestamp_utc: str
input_root: str
output_root: str
chunk_size: int
obfuscate_contacts: bool
processed_count: int
manifest_path: str
summary_csv_path: str
chunks_csv_path: str
documents: list[ProcessedDocument]
def to_manifest(self) -> dict[str, Any]:
return {
"run_timestamp_utc": self.run_timestamp_utc,
"input_root": self.input_root,
"output_root": self.output_root,
"chunk_size": self.chunk_size,
"obfuscate_contacts": self.obfuscate_contacts,
"processed_count": self.processed_count,
"documents": [asdict(item) for item in self.documents],
}
def to_dict(self) -> dict[str, Any]:
payload = self.to_manifest()
payload["manifest_path"] = self.manifest_path
payload["summary_csv_path"] = self.summary_csv_path
payload["chunks_csv_path"] = self.chunks_csv_path
return payload
class EmailAddressObfuscator:
def __init__(self) -> None:
self.entity_map: dict[str, str] = {}
self.counters: dict[str, int] = {"person": 0, "email": 0}
def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
return [
{
"name": self._obfuscate_value(name, "person") if name else "",
"address": self._obfuscate_value(address, "email") if address else "",
}
for name, address in contacts
]
def _obfuscate_value(self, value: str, entity_type: str) -> str:
normalized = re.sub(r"\s+", " ", value).strip()
if not normalized:
return normalized
if normalized not in self.entity_map:
self.counters[entity_type] += 1
self.entity_map[normalized] = f"{entity_type}-{self.counters[entity_type]}"
return self.entity_map[normalized]
class DoclingStructuredOutputApp:
def __init__(self, input_dir: Path, output_root: Path, chunk_size: int, obfuscate: bool = True) -> None:
self.input_dir = input_dir
self.output_root = output_root
self.chunk_size = chunk_size
self.obfuscate = obfuscate
self.converter = DocumentConverter()
self.timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
self.run_dir = self.output_root / self.timestamp
self.email_obfuscator = EmailAddressObfuscator()
def run(self) -> PipelineRunResult:
self.run_dir.mkdir(parents=True, exist_ok=True)
documents = self._discover_documents()
processed_documents = [self._process_document(path) for path in documents]
manifest_path = self.run_dir / "manifest.json"
csv_summary_path = self.run_dir / "summary.csv"
chunk_csv_path = self.run_dir / "chunks.csv"
run_result = PipelineRunResult(
run_timestamp_utc=self.timestamp,
input_root=str(self.input_dir.resolve()),
output_root=str(self.run_dir.resolve()),
chunk_size=self.chunk_size,
obfuscate_contacts=self.obfuscate,
processed_count=len(processed_documents),
manifest_path=str(manifest_path.resolve()),
summary_csv_path=str(csv_summary_path.resolve()),
chunks_csv_path=str(chunk_csv_path.resolve()),
documents=processed_documents,
)
manifest_path.write_text(json.dumps(run_result.to_manifest(), indent=2, ensure_ascii=False), encoding="utf-8")
self._write_csv_summary(csv_summary_path, processed_documents)
self._write_chunk_csv(chunk_csv_path, processed_documents)
return run_result
def _discover_documents(self) -> list[Path]:
documents = sorted(
path
for path in self.input_dir.rglob("*")
if path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
)
if not documents:
raise FileNotFoundError(f"No EPUB or EML files were found under {self.input_dir}")
return documents
def _process_document(self, source_path: Path) -> ProcessedDocument:
result = self.converter.convert(source_path)
document = result.document
relative_path = source_path.relative_to(self.input_dir)
slug = self._slugify(relative_path.with_suffix(""))
destination_dir = self.run_dir / slug
destination_dir.mkdir(parents=True, exist_ok=True)
markdown_path = destination_dir / "document.md"
json_path = destination_dir / "document.json"
text_path = destination_dir / "document.txt"
summary_path = destination_dir / "structured_summary.json"
chunks_path = destination_dir / "chunks.json"
markdown_content = document.export_to_markdown()
text_content = document.export_to_text()
markdown_path.write_text(markdown_content, encoding="utf-8")
json_path.write_text(json.dumps(document.export_to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
text_path.write_text(text_content, encoding="utf-8")
metadata = self._build_metadata(source_path)
chunks = self._build_chunks(
source_path=source_path,
markdown_content=markdown_content,
text_content=text_content,
metadata=metadata,
max_chunk_chars=self.chunk_size,
)
chunks_path.write_text(json.dumps(chunks, indent=2, ensure_ascii=False), encoding="utf-8")
structured_summary = {
"source": {
"path": str(source_path.resolve()),
"name": source_path.name,
"extension": source_path.suffix.lower(),
"relative_path": str(relative_path),
},
"docling": {
"format": str(result.input.format),
"document_name": document.name,
"text_length": len(text_content),
"markdown_length": len(markdown_content),
"chunk_count": len(chunks),
"chunk_size": self.chunk_size,
},
"metadata": metadata,
"artifacts": {
"markdown": str(markdown_path.resolve()),
"json": str(json_path.resolve()),
"text": str(text_path.resolve()),
"chunks": str(chunks_path.resolve()),
},
}
attachment_dir: Path | None = None
if source_path.suffix.lower() == ".eml":
attachment_dir = destination_dir / "attachments"
attachment_metadata = self._extract_eml_attachments(source_path, attachment_dir)
structured_summary["email"] = {
key: value for key, value in metadata.items() if key not in {"title", "author", "description"}
}
structured_summary["email"]["attachments"] = attachment_metadata
else:
structured_summary["publication"] = metadata
summary_path.write_text(json.dumps(structured_summary, indent=2, ensure_ascii=False), encoding="utf-8")
return ProcessedDocument(
source_path=str(source_path.resolve()),
source_name=source_path.name,
source_extension=source_path.suffix.lower(),
relative_path=str(relative_path),
docling_format=str(result.input.format),
output_directory=str(destination_dir.resolve()),
markdown_path=str(markdown_path.resolve()),
json_path=str(json_path.resolve()),
text_path=str(text_path.resolve()),
structured_summary_path=str(summary_path.resolve()),
chunks_path=str(chunks_path.resolve()),
attachment_dir=str(attachment_dir.resolve()) if attachment_dir and attachment_dir.exists() else None,
metadata=metadata,
chunk_count=len(chunks),
text_length=len(text_content),
markdown_length=len(markdown_content),
)
def _build_metadata(self, source_path: Path) -> dict[str, Any]:
if source_path.suffix.lower() == ".eml":
return self._extract_eml_metadata(source_path)
return self._extract_epub_metadata(source_path)
def _extract_epub_metadata(self, source_path: Path) -> dict[str, Any]:
with zipfile.ZipFile(source_path) as archive:
container_xml = archive.read("META-INF/container.xml")
container_root = ET.fromstring(container_xml)
namespace = {"container": "urn:oasis:names:tc:opendocument:xmlns:container"}
rootfile = container_root.find("container:rootfiles/container:rootfile", namespace)
if rootfile is None:
raise ValueError(f"Unable to locate package document in EPUB file {source_path}")
opf_path = rootfile.attrib.get("full-path")
if not opf_path:
raise ValueError(f"EPUB package document path missing for {source_path}")
opf_root = ET.fromstring(archive.read(opf_path))
ns = {
"dc": "http://purl.org/dc/elements/1.1/",
"opf": "http://www.idpf.org/2007/opf",
}
metadata_element = opf_root.find("opf:metadata", ns)
if metadata_element is None:
return {}
creators = [
element.text.strip()
for element in metadata_element.findall("dc:creator", ns)
if element.text and element.text.strip()
]
subjects = [
element.text.strip()
for element in metadata_element.findall("dc:subject", ns)
if element.text and element.text.strip()
]
metadata: dict[str, Any] = {
"title": self._xml_text(metadata_element.find("dc:title", ns)),
"author": creators[0] if creators else None,
"creators": creators,
"language": self._xml_text(metadata_element.find("dc:language", ns)),
"publisher": self._xml_text(metadata_element.find("dc:publisher", ns)),
"identifier": self._xml_text(metadata_element.find("dc:identifier", ns)),
"description": self._xml_text(metadata_element.find("dc:description", ns)),
"subjects": subjects,
"date": self._xml_text(metadata_element.find("dc:date", ns)),
"rights": self._xml_text(metadata_element.find("dc:rights", ns)),
}
return {key: value for key, value in metadata.items() if value not in (None, [], "")}
def _extract_eml_metadata(self, source_path: Path) -> dict[str, Any]:
message = mailparser.parse_from_file(str(source_path))
sent_at = self._normalize_datetime(message.date)
if self.obfuscate:
from_contacts = self.email_obfuscator.obfuscate_contacts(message.from_)
to_contacts = self.email_obfuscator.obfuscate_contacts(message.to)
obfuscation_info: dict[str, Any] = {
"enabled": True,
"strategy": "stable_type_based_ids_from__example/Pii obfuscate.md",
"fields": ["from", "to"],
}
else:
from_contacts = [self._pair_to_dict(item) for item in message.from_]
to_contacts = [self._pair_to_dict(item) for item in message.to]
obfuscation_info = {"enabled": False}
return {
"title": message.subject,
"subject": message.subject,
"from": from_contacts,
"to": to_contacts,
"cc": [self._pair_to_dict(item) for item in message.cc],
"bcc": [self._pair_to_dict(item) for item in message.bcc],
"date": sent_at,
"message_id": message.message_id,
"attachments_count": len(message.attachments),
"text_plain_count": len(message.text_plain),
"text_html_count": len(message.text_html),
"obfuscation": obfuscation_info,
}
def _extract_eml_attachments(self, source_path: Path, attachment_dir: Path) -> list[dict[str, Any]]:
message = mailparser.parse_from_file(str(source_path))
if not message.attachments:
return []
attachment_dir.mkdir(parents=True, exist_ok=True)
attachments: list[dict[str, Any]] = []
for index, attachment in enumerate(message.attachments, start=1):
original_name = attachment.get("filename") or f"attachment-{index}"
safe_name = self._slugify(Path(original_name))
suffix = Path(original_name).suffix
target_name = f"{safe_name}{suffix}" if suffix and not safe_name.endswith(suffix) else safe_name
target_path = attachment_dir / target_name
payload = attachment.get("payload", "")
binary = attachment.get("binary", False)
if binary:
target_path.write_bytes(payload if isinstance(payload, bytes) else payload.encode("utf-8", errors="ignore"))
else:
target_path.write_text(
payload if isinstance(payload, str) else payload.decode("utf-8", errors="ignore"),
encoding="utf-8",
)
attachments.append(
{
"filename": original_name,
"saved_as": str(target_path.resolve()),
"content_type": attachment.get("mail_content_type"),
"content_disposition": attachment.get("content-disposition"),
"binary": binary,
}
)
return attachments
def _write_csv_summary(self, csv_summary_path: Path, processed_documents: list[ProcessedDocument]) -> None:
with csv_summary_path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"source_name",
"relative_path",
"source_extension",
"docling_format",
"chunk_count",
"output_directory",
"structured_summary_path",
"chunks_path",
],
)
writer.writeheader()
for document in processed_documents:
writer.writerow(
{
"source_name": document.source_name,
"relative_path": document.relative_path,
"source_extension": document.source_extension,
"docling_format": document.docling_format,
"chunk_count": document.chunk_count,
"output_directory": document.output_directory,
"structured_summary_path": document.structured_summary_path,
"chunks_path": document.chunks_path,
}
)
def _write_chunk_csv(self, chunk_csv_path: Path, processed_documents: list[ProcessedDocument]) -> None:
with chunk_csv_path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"source_name",
"chunk_id",
"chunk_index",
"char_count",
"estimated_token_count",
"section_heading",
"source_extension",
"text",
],
)
writer.writeheader()
for document in processed_documents:
chunks = json.loads(Path(document.chunks_path).read_text(encoding="utf-8"))
for chunk in chunks:
writer.writerow(
{
"source_name": document.source_name,
"chunk_id": chunk["chunk_id"],
"chunk_index": chunk["chunk_index"],
"char_count": chunk["char_count"],
"estimated_token_count": chunk["estimated_token_count"],
"section_heading": chunk["section_heading"],
"source_extension": chunk["source_extension"],
"text": chunk["text"],
}
)
def _build_chunks(
self,
source_path: Path,
markdown_content: str,
text_content: str,
metadata: dict[str, Any],
*,
max_chunk_chars: int,
) -> list[dict[str, Any]]:
content = markdown_content.strip() or text_content.strip()
if not content:
return []
source_slug = self._slugify(source_path.relative_to(self.input_dir).with_suffix(""))
paragraphs = [segment.strip() for segment in re.split(r"\n\s*\n", content) if segment.strip()]
chunks: list[dict[str, Any]] = []
buffer: list[str] = []
buffer_size = 0
current_heading: str | None = None
def flush(section_heading: str | None) -> None:
nonlocal buffer, buffer_size
if not buffer:
return
chunk_text = "\n\n".join(buffer).strip()
chunk_index = len(chunks)
chunks.append(
{
"chunk_id": f"{source_slug}-chunk-{chunk_index:04d}",
"chunk_index": chunk_index,
"text": chunk_text,
"char_count": len(chunk_text),
"estimated_token_count": self._estimate_token_count(chunk_text),
"section_heading": section_heading,
"source_name": source_path.name,
"source_extension": source_path.suffix.lower(),
"metadata": metadata,
}
)
buffer = []
buffer_size = 0
for paragraph in paragraphs:
heading = self._extract_heading(paragraph)
if heading:
if buffer:
flush(current_heading)
current_heading = heading
paragraph_length = len(paragraph)
if buffer and buffer_size + paragraph_length + 2 > max_chunk_chars:
flush(current_heading)
if paragraph_length > max_chunk_chars:
for start in range(0, paragraph_length, max_chunk_chars):
piece = paragraph[start : start + max_chunk_chars].strip()
if piece:
buffer = [piece]
buffer_size = len(piece)
flush(current_heading)
continue
buffer.append(paragraph)
buffer_size += paragraph_length + 2
flush(current_heading)
return chunks
@staticmethod
def _extract_heading(paragraph: str) -> str | None:
stripped = paragraph.strip()
if stripped.startswith("#"):
return stripped.lstrip("#").strip() or None
if stripped.isupper() and len(stripped.split()) <= 12:
return stripped.title()
return None
@staticmethod
def _estimate_token_count(text: str) -> int:
return max(1, round(len(text.split()) * 1.3))
@staticmethod
def _pair_to_dict(item: tuple[str, str]) -> dict[str, str]:
display_name, address = item
return {"name": display_name, "address": address}
@staticmethod
def _normalize_datetime(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, str):
try:
return parsedate_to_datetime(value).isoformat()
except (TypeError, ValueError):
return value
return str(value)
@staticmethod
def _xml_text(element: ET.Element | None) -> str | None:
if element is None or element.text is None:
return None
value = re.sub(r"\s+", " ", element.text).strip()
return value or None
@staticmethod
def _slugify(path_like: Path) -> str:
raw = str(path_like).replace("/", "-")
slug = re.sub(r"[^A-Za-z0-9._-]+", "-", raw).strip("-._")
return slug or "document"
def build_ui_document_rows(run_result: PipelineRunResult) -> list[list[Any]]:
return [
[
item.source_name,
item.relative_path,
item.source_extension,
item.chunk_count,
item.docling_format,
item.output_directory,
]
for item in run_result.documents
]
def load_document_preview(processed_document: ProcessedDocument, max_chars: int = 4000) -> tuple[str, str, str]:
summary_payload = json.loads(Path(processed_document.structured_summary_path).read_text(encoding="utf-8"))
chunks_payload = json.loads(Path(processed_document.chunks_path).read_text(encoding="utf-8"))
text_preview = Path(processed_document.text_path).read_text(encoding="utf-8")[:max_chars]
return (
json.dumps(summary_payload, indent=2, ensure_ascii=False),
json.dumps(chunks_payload[: min(5, len(chunks_payload))], indent=2, ensure_ascii=False),
text_preview,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build structured outputs from EPUB and EML documents using Docling.")
parser.add_argument("--input-dir", default="input", type=Path, help="Directory containing EPUB and EML files.")
parser.add_argument("--output-dir", default="output", type=Path, help="Root directory for timestamped outputs.")
parser.add_argument(
"--chunk-size",
default=1200,
type=int,
help="Maximum number of characters per generated chunk for RAG/agent workflows.",
)
parser.add_argument(
"--no-obfuscate",
dest="obfuscate",
action="store_false",
default=True,
help="Disable From/To obfuscation in EML outputs.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
app = DoclingStructuredOutputApp(
input_dir=args.input_dir,
output_root=args.output_dir,
chunk_size=args.chunk_size,
obfuscate=args.obfuscate,
)
run_result = app.run()
print(json.dumps(run_result.to_dict(), indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
# ui.py
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
import gradio as gr
from app.main import DoclingStructuredOutputApp, PipelineRunResult, build_ui_document_rows, load_document_preview
DEFAULT_INPUT_DIR = Path("input")
DEFAULT_OUTPUT_DIR = Path("output")
DEFAULT_CHUNK_SIZE = 1200
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 7860
def _format_run_summary(run_result: PipelineRunResult) -> str:
return json.dumps(
{
"status": "ok",
"run_timestamp_utc": run_result.run_timestamp_utc,
"input_root": run_result.input_root,
"output_root": run_result.output_root,
"chunk_size": run_result.chunk_size,
"obfuscate_contacts": run_result.obfuscate_contacts,
"processed_count": run_result.processed_count,
"manifest_path": run_result.manifest_path,
"summary_csv_path": run_result.summary_csv_path,
"chunks_csv_path": run_result.chunks_csv_path,
},
indent=2,
ensure_ascii=False,
)
def _resolve_directory(path_value: str, fallback: Path) -> Path:
candidate = Path(path_value.strip()) if path_value.strip() else fallback
return candidate.expanduser().resolve()
def _run_pipeline(input_dir_value: str, output_dir_value: str, chunk_size: int, obfuscate: bool):
input_dir = _resolve_directory(input_dir_value, DEFAULT_INPUT_DIR)
output_dir = _resolve_directory(output_dir_value, DEFAULT_OUTPUT_DIR)
if chunk_size <= 0:
raise gr.Error("Chunk size must be a positive integer.")
if not input_dir.exists() or not input_dir.is_dir():
raise gr.Error(f"Input directory does not exist: {input_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
app = DoclingStructuredOutputApp(
input_dir=input_dir,
output_root=output_dir,
chunk_size=chunk_size,
obfuscate=obfuscate,
)
run_result = app.run()
rows = build_ui_document_rows(run_result)
manifest_json = json.dumps(run_result.to_dict(), indent=2, ensure_ascii=False)
summary_json = _format_run_summary(run_result)
dropdown_choices = [
(f"{document.relative_path} ({document.source_extension})", index)
for index, document in enumerate(run_result.documents)
]
first_summary, first_chunks, first_text = ("", "", "")
selected_index = None
if run_result.documents:
selected_index = 0
first_summary, first_chunks, first_text = load_document_preview(run_result.documents[0])
return (
summary_json,
manifest_json,
rows,
gr.update(choices=dropdown_choices, value=selected_index),
first_summary,
first_chunks,
first_text,
run_result.to_dict(),
)
def _show_document_preview(selected_index: int | None, run_result_payload: dict[str, Any] | None):
if run_result_payload is None:
return "", "", ""
if selected_index is None:
return "", "", ""
documents = run_result_payload.get("documents", [])
if not isinstance(documents, list) or selected_index < 0 or selected_index >= len(documents):
return "", "", ""
document_payload = documents[selected_index]
summary_text = Path(document_payload["structured_summary_path"]).read_text(encoding="utf-8")
chunks_payload = json.loads(Path(document_payload["chunks_path"]).read_text(encoding="utf-8"))
text_preview = Path(document_payload["text_path"]).read_text(encoding="utf-8")[:4000]
return summary_text, json.dumps(chunks_payload[: min(5, len(chunks_payload))], indent=2, ensure_ascii=False), text_preview
def build_interface() -> gr.Blocks:
with gr.Blocks(title="Docling EPUB/EML Structured Output UI") as demo:
run_state = gr.State(value=None)
gr.Markdown(
"# Docling EPUB and EML Structured Output UI\n"
"Run the same recursive Docling pipeline from the browser, toggle EML obfuscation, and inspect generated outputs."
)
with gr.Row():
with gr.Column(scale=1):
input_dir = gr.Textbox(label="Input directory", value=str(DEFAULT_INPUT_DIR), lines=1)
output_dir = gr.Textbox(label="Output root directory", value=str(DEFAULT_OUTPUT_DIR), lines=1)
chunk_size = gr.Number(label="Chunk size", value=DEFAULT_CHUNK_SIZE, precision=0)
obfuscate = gr.Checkbox(label="Obfuscate EML From/To fields", value=True)
run_button = gr.Button("Run pipeline", variant="primary")
with gr.Column(scale=1):
run_summary = gr.Code(label="Run summary", language="json")
manifest_preview = gr.Code(label="Manifest preview", language="json")
with gr.Row():
results_table = gr.Dataframe(
headers=["source_name", "relative_path", "extension", "chunk_count", "docling_format", "output_directory"],
datatype=["str", "str", "str", "number", "str", "str"],
row_count=1,
col_count=(6, "fixed"),
interactive=False,
label="Processed documents",
)
document_selector = gr.Dropdown(label="Inspect a processed document", choices=[], value=None)
with gr.Row():
document_summary = gr.Code(label="Structured summary", language="json")
chunk_preview = gr.Code(label="Chunk preview (first 5 chunks)", language="json")
text_preview = gr.Textbox(label="Document text preview", lines=18, max_lines=18)
run_button.click(
fn=_run_pipeline,
inputs=[input_dir, output_dir, chunk_size, obfuscate],
outputs=[
run_summary,
manifest_preview,
results_table,
document_selector,
document_summary,
chunk_preview,
text_preview,
run_state,
],
)
document_selector.change(
fn=_show_document_preview,
inputs=[document_selector, run_state],
outputs=[document_summary, chunk_preview, text_preview],
)
return demo
def _get_ui_host() -> str:
host = os.getenv("UI_HOST", DEFAULT_HOST).strip()
return host or DEFAULT_HOST
def _get_ui_port() -> int:
raw_port = os.getenv("UI_PORT", str(DEFAULT_PORT)).strip()
if not raw_port:
return DEFAULT_PORT
try:
port = int(raw_port)
except ValueError as exc:
raise ValueError(f"Invalid UI_PORT value: {raw_port}") from exc
if port <= 0 or port > 65535:
raise ValueError(f"UI_PORT must be between 1 and 65535, got: {port}")
return port
def launch_ui(host: str | None = None, port: int | None = None) -> None:
resolved_host = host if host is not None else _get_ui_host()
resolved_port = port if port is not None else _get_ui_port()
demo = build_interface()
demo.launch(server_name=resolved_host, server_port=resolved_port)
if __name__ == "__main__":
launch_ui()
input/ recursive scan⟶DoclingStructuredOutputApp⟶Format Filtering (.epub / .eml)⟶Docling Engine Conversion…
The pipeline operates via an orchestrated runtime cycle:
- Recursive Discovery: The framework crawls the target input path, isolating supported extensions (
.epub,.eml) dynamically. - Docling Normalization: The engine invokes
DocumentConverter()to normalize files into a universal document representation, immediately enabling clean exports across three formats: markdown (document.md), raw text (document.txt), and full structural schema (document.json). - Metadata Mapping: Custom extractors parse internal
EPUB XMLpackage elements andEML MIMEheaders. - Downstream-Ready Chunking: Extracted text streams are split into token-optimized sections bounded by a configurable character threshold (
Cmax).
The PII Sanitization Engine: Safe EML Ingestion
Deploying communication data into shared LLM infrastructure introduces compliance risks regarding Personally Identifiable Information (PII). To mitigate this threat without breaking semantic link continuity, Bob implemented an automated contact obfuscation system within the email pipeline.
When processing .eml documents, the pipeline evaluates incoming routing vectors (From and To fields). If obfuscation is activated, the application passes identities through a stateful mapping filter, replacing real addresses and human names with stable, type-specific identifiers.
# Implementation snippet from Bob's EmailAddressObfuscator
class EmailAddressObfuscator:
def __init__(self) -> None:
self.entity_map: dict[str, str] = {}
self.counters: dict[str, int] = {"person": 0, "email": 0}
def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
return [
{
"name": self._obfuscate_value(name, "person") if name else "",
"address": self._obfuscate_value(address, "email") if address else "",
}
for name, address in contacts
] def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
return [
{
"name": self._obfuscate_value(name, "person") if name else "",
"address": self._obfuscate_value(address, "email") if address else "",
}
for name, address in contacts
]
This masking logic guarantees that identity strings remain consistent across a single processing batch. For example, if john.doe@corporate.com appears multiple times across separate communication nodes within the batch, it is mapped to a fixed identifier (e.g., email-1). This retains the transactional relationships needed for deep contextual understanding while ensuring actual sensitive data never hits global staging databases.
Granular Token-Aware Text Chunking
To feed large language models efficiently, documents must be split into chunks that fit within model window constraints. Bob’s design provides custom mathematical paragraph aggregation that prioritizes layout hierarchies over blind character counts. Section titles are extracted via structured markdown headers (e.g., lines starting with #) or detected typographic attributes. The buffer size tracks chunk lengths dynamically:
Where Pi represents sequential structural paragraphs and Cmax represents the maximum character ceiling constraint. This ensures chunks split cleanly at natural document breaks, keeping section headings attached to their text blocks to maximize vector search retrieval performance.
Consolidated Reporting and Analytics
Upon completing a data extraction run, the pipeline generates run-level summaries to ensure tracking and auditable data flow across automated AI systems:
-
manifest.json: A complete execution log tracking global timestamps, paths, flags, and an embedded array containing full document properties. -
summary.csv: A high-level tabular matrix mapping source document formats to final analytical targets. -
chunks.csv: A flat database containing every generated text block alongside its estimated token length, parent headings, and source metadata, allowing direct bulk insertion into vector indexing engines.
Through this modular architecture, Bob has transformed a complex format issue into an automated, highly reliable pipeline. Enterprise data teams can now safely extend their knowledge pipelines to process EPUB files and EML archives, keeping data secure and fully optimized for advanced generative AI applications.
Conclusion
Through this industrialized implementation, Bob has successfully delivered a robust, production-grade pipeline that unlocks the hidden knowledge within complex EPUB volumes and raw .eml archives. By wrapping the powerful multi-format ingestion of the Docling engine into both a flexible CLI automation script and an intuitive, interactive Gradio application, this project bridges the critical gap between raw enterprise data and downstream AI applications. The realization of advanced, token-aware document chunking combined with stateful, privacy-first PII obfuscation proves that data ingestion can be both highly intelligent and strictly compliant. Ultimately, this work provides a scalable blueprint for modern RAG and agentic workflows, transforming fragmented corporate communications and literature into highly structured, embedding-ready, and secure analytical assets.
>>> Thanks for reading <<<
Links
- Github repository for this blog: https://github.com/aairom/Docling-eml-epub
- Docling Projet: https://docling-project.github.io/docling/
- Docling Github: https://github.com/docling-project/docling







Top comments (0)