DEV Community

Alain Airom (Ayrom)
Alain Airom (Ayrom)

Posted on

Reading Epub and .EML files with Docling for RAG and more…

Docling v2.102 and new great features!

Introduction

In its latest release, Docling expands its robust parsing capabilities by introducing native support for EPUB and .eml formats.

By adding these to its already extensive list of compatible document types, the Docling engine makes data preparation for Generative AI applications smoother than ever. Whether you are chunking text for RAG pipelines, extracting clean metadata, or structuring unstructured files for LLM training, Docling effortlessly streamlines the entire document processing workflow and simplifies downstream generative AI workloads—such as token-aware text chunking, document summarization, and vector store preparation.

Before moving forward, a brief introduction on these two formats.

The EPUB Format

The EPUB (Electronic Publication) format was introduced in 2007 by the International Digital Publishing Forum (IDPF) as a successor to the older Open eBook standard, and it is currently maintained by the W3C. Designed as an open, XML-based standard for digital publications, its defining feature is reflowable content. This means the text automatically adjusts and optimizes its layout to fit different screen sizes, resolutions, and orientations — making it the universal standard for e-readers, smartphones, and tablets. Structurally, an .epub file is essentially a zipped archive containing HTML/XHTML files for the text, CSS for styling, images, and XML metadata for navigation and book structure.

The EML Format

The EML (Email) file format was developed by Microsoft to comply with the industry standard RFC 822 (and later RFC 5322) protocols for electronic mail. It was widely popularized by email clients like Outlook Express, Windows Live Mail, and Mozilla Thunderbird. An .emlfile is a plain text file that preserves the exact, raw architecture of a single email message. It contains the message header (including metadata like Sender, Recipient, Date, Subject, and routing servers) followed by the message body. The body can include plain text, HTML formatting, and binary data—such as file attachments or embedded images—which are encoded into text using MIME (Multipurpose Internet Mail Extensions) formatting.


Industrializing Document Ingestion: Processing EPUB, EML, and Privacy-First (PII) Pipelines with Docling

As usual, our automated software engineer, Bob, has designed and industrialized a production-ready file processing pipeline. This system bridges the gap between complex unstructured file systems and downstream vectorized AI platforms by orchestrating native format decoders alongside IBM’s open-source Docling engine.

Architectural Overview & Workflow Pipeline

The solution is split into two major layers: a high-throughput CLI processing engine (app/main.py) and an interactive Gradio validation UI (app/ui.py). The operational architecture maps a predictable, reliable path from raw user input down to analytical artifacts:

# main.py
from __future__ import annotations

import argparse
import csv
import json
import re
import xml.etree.ElementTree as ET
import zipfile
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any

try:
    import mailparser
except ModuleNotFoundError as exc:
    raise SystemExit(
        "Missing dependency 'mailparser'. Activate the project virtual environment and install dependencies with '\n"
        "python3 -m venv .venv && ./.venv/bin/pip install --upgrade pip && ./.venv/bin/pip install -e .[dev]'\n"
        "Then run either './.venv/bin/python app/main.py' or './.venv/bin/python -m app.main'."
    ) from exc

from docling.document_converter import DocumentConverter

SUPPORTED_EXTENSIONS = {".epub", ".eml"}


@dataclass(slots=True)
class ProcessedDocument:
    source_path: str
    source_name: str
    source_extension: str
    relative_path: str
    docling_format: str
    output_directory: str
    markdown_path: str
    json_path: str
    text_path: str
    structured_summary_path: str
    chunks_path: str
    attachment_dir: str | None
    metadata: dict[str, Any]
    chunk_count: int
    text_length: int
    markdown_length: int


@dataclass(slots=True)
class PipelineRunResult:
    run_timestamp_utc: str
    input_root: str
    output_root: str
    chunk_size: int
    obfuscate_contacts: bool
    processed_count: int
    manifest_path: str
    summary_csv_path: str
    chunks_csv_path: str
    documents: list[ProcessedDocument]

    def to_manifest(self) -> dict[str, Any]:
        return {
            "run_timestamp_utc": self.run_timestamp_utc,
            "input_root": self.input_root,
            "output_root": self.output_root,
            "chunk_size": self.chunk_size,
            "obfuscate_contacts": self.obfuscate_contacts,
            "processed_count": self.processed_count,
            "documents": [asdict(item) for item in self.documents],
        }

    def to_dict(self) -> dict[str, Any]:
        payload = self.to_manifest()
        payload["manifest_path"] = self.manifest_path
        payload["summary_csv_path"] = self.summary_csv_path
        payload["chunks_csv_path"] = self.chunks_csv_path
        return payload


class EmailAddressObfuscator:
    def __init__(self) -> None:
        self.entity_map: dict[str, str] = {}
        self.counters: dict[str, int] = {"person": 0, "email": 0}

    def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
        return [
            {
                "name": self._obfuscate_value(name, "person") if name else "",
                "address": self._obfuscate_value(address, "email") if address else "",
            }
            for name, address in contacts
        ]

    def _obfuscate_value(self, value: str, entity_type: str) -> str:
        normalized = re.sub(r"\s+", " ", value).strip()
        if not normalized:
            return normalized
        if normalized not in self.entity_map:
            self.counters[entity_type] += 1
            self.entity_map[normalized] = f"{entity_type}-{self.counters[entity_type]}"
        return self.entity_map[normalized]


class DoclingStructuredOutputApp:
    def __init__(self, input_dir: Path, output_root: Path, chunk_size: int, obfuscate: bool = True) -> None:
        self.input_dir = input_dir
        self.output_root = output_root
        self.chunk_size = chunk_size
        self.obfuscate = obfuscate
        self.converter = DocumentConverter()
        self.timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
        self.run_dir = self.output_root / self.timestamp
        self.email_obfuscator = EmailAddressObfuscator()

    def run(self) -> PipelineRunResult:
        self.run_dir.mkdir(parents=True, exist_ok=True)
        documents = self._discover_documents()
        processed_documents = [self._process_document(path) for path in documents]
        manifest_path = self.run_dir / "manifest.json"
        csv_summary_path = self.run_dir / "summary.csv"
        chunk_csv_path = self.run_dir / "chunks.csv"

        run_result = PipelineRunResult(
            run_timestamp_utc=self.timestamp,
            input_root=str(self.input_dir.resolve()),
            output_root=str(self.run_dir.resolve()),
            chunk_size=self.chunk_size,
            obfuscate_contacts=self.obfuscate,
            processed_count=len(processed_documents),
            manifest_path=str(manifest_path.resolve()),
            summary_csv_path=str(csv_summary_path.resolve()),
            chunks_csv_path=str(chunk_csv_path.resolve()),
            documents=processed_documents,
        )

        manifest_path.write_text(json.dumps(run_result.to_manifest(), indent=2, ensure_ascii=False), encoding="utf-8")
        self._write_csv_summary(csv_summary_path, processed_documents)
        self._write_chunk_csv(chunk_csv_path, processed_documents)
        return run_result

    def _discover_documents(self) -> list[Path]:
        documents = sorted(
            path
            for path in self.input_dir.rglob("*")
            if path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS
        )
        if not documents:
            raise FileNotFoundError(f"No EPUB or EML files were found under {self.input_dir}")
        return documents

    def _process_document(self, source_path: Path) -> ProcessedDocument:
        result = self.converter.convert(source_path)
        document = result.document
        relative_path = source_path.relative_to(self.input_dir)
        slug = self._slugify(relative_path.with_suffix(""))
        destination_dir = self.run_dir / slug
        destination_dir.mkdir(parents=True, exist_ok=True)

        markdown_path = destination_dir / "document.md"
        json_path = destination_dir / "document.json"
        text_path = destination_dir / "document.txt"
        summary_path = destination_dir / "structured_summary.json"
        chunks_path = destination_dir / "chunks.json"

        markdown_content = document.export_to_markdown()
        text_content = document.export_to_text()
        markdown_path.write_text(markdown_content, encoding="utf-8")
        json_path.write_text(json.dumps(document.export_to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
        text_path.write_text(text_content, encoding="utf-8")

        metadata = self._build_metadata(source_path)
        chunks = self._build_chunks(
            source_path=source_path,
            markdown_content=markdown_content,
            text_content=text_content,
            metadata=metadata,
            max_chunk_chars=self.chunk_size,
        )
        chunks_path.write_text(json.dumps(chunks, indent=2, ensure_ascii=False), encoding="utf-8")

        structured_summary = {
            "source": {
                "path": str(source_path.resolve()),
                "name": source_path.name,
                "extension": source_path.suffix.lower(),
                "relative_path": str(relative_path),
            },
            "docling": {
                "format": str(result.input.format),
                "document_name": document.name,
                "text_length": len(text_content),
                "markdown_length": len(markdown_content),
                "chunk_count": len(chunks),
                "chunk_size": self.chunk_size,
            },
            "metadata": metadata,
            "artifacts": {
                "markdown": str(markdown_path.resolve()),
                "json": str(json_path.resolve()),
                "text": str(text_path.resolve()),
                "chunks": str(chunks_path.resolve()),
            },
        }

        attachment_dir: Path | None = None
        if source_path.suffix.lower() == ".eml":
            attachment_dir = destination_dir / "attachments"
            attachment_metadata = self._extract_eml_attachments(source_path, attachment_dir)
            structured_summary["email"] = {
                key: value for key, value in metadata.items() if key not in {"title", "author", "description"}
            }
            structured_summary["email"]["attachments"] = attachment_metadata
        else:
            structured_summary["publication"] = metadata

        summary_path.write_text(json.dumps(structured_summary, indent=2, ensure_ascii=False), encoding="utf-8")

        return ProcessedDocument(
            source_path=str(source_path.resolve()),
            source_name=source_path.name,
            source_extension=source_path.suffix.lower(),
            relative_path=str(relative_path),
            docling_format=str(result.input.format),
            output_directory=str(destination_dir.resolve()),
            markdown_path=str(markdown_path.resolve()),
            json_path=str(json_path.resolve()),
            text_path=str(text_path.resolve()),
            structured_summary_path=str(summary_path.resolve()),
            chunks_path=str(chunks_path.resolve()),
            attachment_dir=str(attachment_dir.resolve()) if attachment_dir and attachment_dir.exists() else None,
            metadata=metadata,
            chunk_count=len(chunks),
            text_length=len(text_content),
            markdown_length=len(markdown_content),
        )

    def _build_metadata(self, source_path: Path) -> dict[str, Any]:
        if source_path.suffix.lower() == ".eml":
            return self._extract_eml_metadata(source_path)
        return self._extract_epub_metadata(source_path)

    def _extract_epub_metadata(self, source_path: Path) -> dict[str, Any]:
        with zipfile.ZipFile(source_path) as archive:
            container_xml = archive.read("META-INF/container.xml")
            container_root = ET.fromstring(container_xml)
            namespace = {"container": "urn:oasis:names:tc:opendocument:xmlns:container"}
            rootfile = container_root.find("container:rootfiles/container:rootfile", namespace)
            if rootfile is None:
                raise ValueError(f"Unable to locate package document in EPUB file {source_path}")

            opf_path = rootfile.attrib.get("full-path")
            if not opf_path:
                raise ValueError(f"EPUB package document path missing for {source_path}")

            opf_root = ET.fromstring(archive.read(opf_path))

        ns = {
            "dc": "http://purl.org/dc/elements/1.1/",
            "opf": "http://www.idpf.org/2007/opf",
        }
        metadata_element = opf_root.find("opf:metadata", ns)
        if metadata_element is None:
            return {}

        creators = [
            element.text.strip()
            for element in metadata_element.findall("dc:creator", ns)
            if element.text and element.text.strip()
        ]
        subjects = [
            element.text.strip()
            for element in metadata_element.findall("dc:subject", ns)
            if element.text and element.text.strip()
        ]
        metadata: dict[str, Any] = {
            "title": self._xml_text(metadata_element.find("dc:title", ns)),
            "author": creators[0] if creators else None,
            "creators": creators,
            "language": self._xml_text(metadata_element.find("dc:language", ns)),
            "publisher": self._xml_text(metadata_element.find("dc:publisher", ns)),
            "identifier": self._xml_text(metadata_element.find("dc:identifier", ns)),
            "description": self._xml_text(metadata_element.find("dc:description", ns)),
            "subjects": subjects,
            "date": self._xml_text(metadata_element.find("dc:date", ns)),
            "rights": self._xml_text(metadata_element.find("dc:rights", ns)),
        }
        return {key: value for key, value in metadata.items() if value not in (None, [], "")}

    def _extract_eml_metadata(self, source_path: Path) -> dict[str, Any]:
        message = mailparser.parse_from_file(str(source_path))
        sent_at = self._normalize_datetime(message.date)
        if self.obfuscate:
            from_contacts = self.email_obfuscator.obfuscate_contacts(message.from_)
            to_contacts = self.email_obfuscator.obfuscate_contacts(message.to)
            obfuscation_info: dict[str, Any] = {
                "enabled": True,
                "strategy": "stable_type_based_ids_from__example/Pii obfuscate.md",
                "fields": ["from", "to"],
            }
        else:
            from_contacts = [self._pair_to_dict(item) for item in message.from_]
            to_contacts = [self._pair_to_dict(item) for item in message.to]
            obfuscation_info = {"enabled": False}

        return {
            "title": message.subject,
            "subject": message.subject,
            "from": from_contacts,
            "to": to_contacts,
            "cc": [self._pair_to_dict(item) for item in message.cc],
            "bcc": [self._pair_to_dict(item) for item in message.bcc],
            "date": sent_at,
            "message_id": message.message_id,
            "attachments_count": len(message.attachments),
            "text_plain_count": len(message.text_plain),
            "text_html_count": len(message.text_html),
            "obfuscation": obfuscation_info,
        }

    def _extract_eml_attachments(self, source_path: Path, attachment_dir: Path) -> list[dict[str, Any]]:
        message = mailparser.parse_from_file(str(source_path))
        if not message.attachments:
            return []

        attachment_dir.mkdir(parents=True, exist_ok=True)
        attachments: list[dict[str, Any]] = []
        for index, attachment in enumerate(message.attachments, start=1):
            original_name = attachment.get("filename") or f"attachment-{index}"
            safe_name = self._slugify(Path(original_name))
            suffix = Path(original_name).suffix
            target_name = f"{safe_name}{suffix}" if suffix and not safe_name.endswith(suffix) else safe_name
            target_path = attachment_dir / target_name
            payload = attachment.get("payload", "")
            binary = attachment.get("binary", False)
            if binary:
                target_path.write_bytes(payload if isinstance(payload, bytes) else payload.encode("utf-8", errors="ignore"))
            else:
                target_path.write_text(
                    payload if isinstance(payload, str) else payload.decode("utf-8", errors="ignore"),
                    encoding="utf-8",
                )
            attachments.append(
                {
                    "filename": original_name,
                    "saved_as": str(target_path.resolve()),
                    "content_type": attachment.get("mail_content_type"),
                    "content_disposition": attachment.get("content-disposition"),
                    "binary": binary,
                }
            )
        return attachments

    def _write_csv_summary(self, csv_summary_path: Path, processed_documents: list[ProcessedDocument]) -> None:
        with csv_summary_path.open("w", newline="", encoding="utf-8") as handle:
            writer = csv.DictWriter(
                handle,
                fieldnames=[
                    "source_name",
                    "relative_path",
                    "source_extension",
                    "docling_format",
                    "chunk_count",
                    "output_directory",
                    "structured_summary_path",
                    "chunks_path",
                ],
            )
            writer.writeheader()
            for document in processed_documents:
                writer.writerow(
                    {
                        "source_name": document.source_name,
                        "relative_path": document.relative_path,
                        "source_extension": document.source_extension,
                        "docling_format": document.docling_format,
                        "chunk_count": document.chunk_count,
                        "output_directory": document.output_directory,
                        "structured_summary_path": document.structured_summary_path,
                        "chunks_path": document.chunks_path,
                    }
                )

    def _write_chunk_csv(self, chunk_csv_path: Path, processed_documents: list[ProcessedDocument]) -> None:
        with chunk_csv_path.open("w", newline="", encoding="utf-8") as handle:
            writer = csv.DictWriter(
                handle,
                fieldnames=[
                    "source_name",
                    "chunk_id",
                    "chunk_index",
                    "char_count",
                    "estimated_token_count",
                    "section_heading",
                    "source_extension",
                    "text",
                ],
            )
            writer.writeheader()
            for document in processed_documents:
                chunks = json.loads(Path(document.chunks_path).read_text(encoding="utf-8"))
                for chunk in chunks:
                    writer.writerow(
                        {
                            "source_name": document.source_name,
                            "chunk_id": chunk["chunk_id"],
                            "chunk_index": chunk["chunk_index"],
                            "char_count": chunk["char_count"],
                            "estimated_token_count": chunk["estimated_token_count"],
                            "section_heading": chunk["section_heading"],
                            "source_extension": chunk["source_extension"],
                            "text": chunk["text"],
                        }
                    )

    def _build_chunks(
        self,
        source_path: Path,
        markdown_content: str,
        text_content: str,
        metadata: dict[str, Any],
        *,
        max_chunk_chars: int,
    ) -> list[dict[str, Any]]:
        content = markdown_content.strip() or text_content.strip()
        if not content:
            return []

        source_slug = self._slugify(source_path.relative_to(self.input_dir).with_suffix(""))
        paragraphs = [segment.strip() for segment in re.split(r"\n\s*\n", content) if segment.strip()]
        chunks: list[dict[str, Any]] = []
        buffer: list[str] = []
        buffer_size = 0
        current_heading: str | None = None

        def flush(section_heading: str | None) -> None:
            nonlocal buffer, buffer_size
            if not buffer:
                return
            chunk_text = "\n\n".join(buffer).strip()
            chunk_index = len(chunks)
            chunks.append(
                {
                    "chunk_id": f"{source_slug}-chunk-{chunk_index:04d}",
                    "chunk_index": chunk_index,
                    "text": chunk_text,
                    "char_count": len(chunk_text),
                    "estimated_token_count": self._estimate_token_count(chunk_text),
                    "section_heading": section_heading,
                    "source_name": source_path.name,
                    "source_extension": source_path.suffix.lower(),
                    "metadata": metadata,
                }
            )
            buffer = []
            buffer_size = 0

        for paragraph in paragraphs:
            heading = self._extract_heading(paragraph)
            if heading:
                if buffer:
                    flush(current_heading)
                current_heading = heading

            paragraph_length = len(paragraph)
            if buffer and buffer_size + paragraph_length + 2 > max_chunk_chars:
                flush(current_heading)
            if paragraph_length > max_chunk_chars:
                for start in range(0, paragraph_length, max_chunk_chars):
                    piece = paragraph[start : start + max_chunk_chars].strip()
                    if piece:
                        buffer = [piece]
                        buffer_size = len(piece)
                        flush(current_heading)
                continue
            buffer.append(paragraph)
            buffer_size += paragraph_length + 2

        flush(current_heading)
        return chunks

    @staticmethod
    def _extract_heading(paragraph: str) -> str | None:
        stripped = paragraph.strip()
        if stripped.startswith("#"):
            return stripped.lstrip("#").strip() or None
        if stripped.isupper() and len(stripped.split()) <= 12:
            return stripped.title()
        return None

    @staticmethod
    def _estimate_token_count(text: str) -> int:
        return max(1, round(len(text.split()) * 1.3))

    @staticmethod
    def _pair_to_dict(item: tuple[str, str]) -> dict[str, str]:
        display_name, address = item
        return {"name": display_name, "address": address}

    @staticmethod
    def _normalize_datetime(value: Any) -> str | None:
        if value is None:
            return None
        if isinstance(value, datetime):
            return value.isoformat()
        if isinstance(value, str):
            try:
                return parsedate_to_datetime(value).isoformat()
            except (TypeError, ValueError):
                return value
        return str(value)

    @staticmethod
    def _xml_text(element: ET.Element | None) -> str | None:
        if element is None or element.text is None:
            return None
        value = re.sub(r"\s+", " ", element.text).strip()
        return value or None

    @staticmethod
    def _slugify(path_like: Path) -> str:
        raw = str(path_like).replace("/", "-")
        slug = re.sub(r"[^A-Za-z0-9._-]+", "-", raw).strip("-._")
        return slug or "document"


def build_ui_document_rows(run_result: PipelineRunResult) -> list[list[Any]]:
    return [
        [
            item.source_name,
            item.relative_path,
            item.source_extension,
            item.chunk_count,
            item.docling_format,
            item.output_directory,
        ]
        for item in run_result.documents
    ]


def load_document_preview(processed_document: ProcessedDocument, max_chars: int = 4000) -> tuple[str, str, str]:
    summary_payload = json.loads(Path(processed_document.structured_summary_path).read_text(encoding="utf-8"))
    chunks_payload = json.loads(Path(processed_document.chunks_path).read_text(encoding="utf-8"))
    text_preview = Path(processed_document.text_path).read_text(encoding="utf-8")[:max_chars]
    return (
        json.dumps(summary_payload, indent=2, ensure_ascii=False),
        json.dumps(chunks_payload[: min(5, len(chunks_payload))], indent=2, ensure_ascii=False),
        text_preview,
    )


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Build structured outputs from EPUB and EML documents using Docling.")
    parser.add_argument("--input-dir", default="input", type=Path, help="Directory containing EPUB and EML files.")
    parser.add_argument("--output-dir", default="output", type=Path, help="Root directory for timestamped outputs.")
    parser.add_argument(
        "--chunk-size",
        default=1200,
        type=int,
        help="Maximum number of characters per generated chunk for RAG/agent workflows.",
    )
    parser.add_argument(
        "--no-obfuscate",
        dest="obfuscate",
        action="store_false",
        default=True,
        help="Disable From/To obfuscation in EML outputs.",
    )
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    app = DoclingStructuredOutputApp(
        input_dir=args.input_dir,
        output_root=args.output_dir,
        chunk_size=args.chunk_size,
        obfuscate=args.obfuscate,
    )
    run_result = app.run()
    print(json.dumps(run_result.to_dict(), indent=2))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
Enter fullscreen mode Exit fullscreen mode

# ui.py
from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any

import gradio as gr

from app.main import DoclingStructuredOutputApp, PipelineRunResult, build_ui_document_rows, load_document_preview

DEFAULT_INPUT_DIR = Path("input")
DEFAULT_OUTPUT_DIR = Path("output")
DEFAULT_CHUNK_SIZE = 1200
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 7860

def _format_run_summary(run_result: PipelineRunResult) -> str:
    return json.dumps(
        {
            "status": "ok",
            "run_timestamp_utc": run_result.run_timestamp_utc,
            "input_root": run_result.input_root,
            "output_root": run_result.output_root,
            "chunk_size": run_result.chunk_size,
            "obfuscate_contacts": run_result.obfuscate_contacts,
            "processed_count": run_result.processed_count,
            "manifest_path": run_result.manifest_path,
            "summary_csv_path": run_result.summary_csv_path,
            "chunks_csv_path": run_result.chunks_csv_path,
        },
        indent=2,
        ensure_ascii=False,
    )

def _resolve_directory(path_value: str, fallback: Path) -> Path:
    candidate = Path(path_value.strip()) if path_value.strip() else fallback
    return candidate.expanduser().resolve()

def _run_pipeline(input_dir_value: str, output_dir_value: str, chunk_size: int, obfuscate: bool):
    input_dir = _resolve_directory(input_dir_value, DEFAULT_INPUT_DIR)
    output_dir = _resolve_directory(output_dir_value, DEFAULT_OUTPUT_DIR)
    if chunk_size <= 0:
        raise gr.Error("Chunk size must be a positive integer.")
    if not input_dir.exists() or not input_dir.is_dir():
        raise gr.Error(f"Input directory does not exist: {input_dir}")

    output_dir.mkdir(parents=True, exist_ok=True)
    app = DoclingStructuredOutputApp(
        input_dir=input_dir,
        output_root=output_dir,
        chunk_size=chunk_size,
        obfuscate=obfuscate,
    )
    run_result = app.run()
    rows = build_ui_document_rows(run_result)
    manifest_json = json.dumps(run_result.to_dict(), indent=2, ensure_ascii=False)
    summary_json = _format_run_summary(run_result)
    dropdown_choices = [
        (f"{document.relative_path} ({document.source_extension})", index)
        for index, document in enumerate(run_result.documents)
    ]
    first_summary, first_chunks, first_text = ("", "", "")
    selected_index = None
    if run_result.documents:
        selected_index = 0
        first_summary, first_chunks, first_text = load_document_preview(run_result.documents[0])

    return (
        summary_json,
        manifest_json,
        rows,
        gr.update(choices=dropdown_choices, value=selected_index),
        first_summary,
        first_chunks,
        first_text,
        run_result.to_dict(),
    )

def _show_document_preview(selected_index: int | None, run_result_payload: dict[str, Any] | None):
    if run_result_payload is None:
        return "", "", ""
    if selected_index is None:
        return "", "", ""

    documents = run_result_payload.get("documents", [])
    if not isinstance(documents, list) or selected_index < 0 or selected_index >= len(documents):
        return "", "", ""

    document_payload = documents[selected_index]
    summary_text = Path(document_payload["structured_summary_path"]).read_text(encoding="utf-8")
    chunks_payload = json.loads(Path(document_payload["chunks_path"]).read_text(encoding="utf-8"))
    text_preview = Path(document_payload["text_path"]).read_text(encoding="utf-8")[:4000]
    return summary_text, json.dumps(chunks_payload[: min(5, len(chunks_payload))], indent=2, ensure_ascii=False), text_preview

def build_interface() -> gr.Blocks:
    with gr.Blocks(title="Docling EPUB/EML Structured Output UI") as demo:
        run_state = gr.State(value=None)

        gr.Markdown(
            "# Docling EPUB and EML Structured Output UI\n"
            "Run the same recursive Docling pipeline from the browser, toggle EML obfuscation, and inspect generated outputs."
        )

        with gr.Row():
            with gr.Column(scale=1):
                input_dir = gr.Textbox(label="Input directory", value=str(DEFAULT_INPUT_DIR), lines=1)
                output_dir = gr.Textbox(label="Output root directory", value=str(DEFAULT_OUTPUT_DIR), lines=1)
                chunk_size = gr.Number(label="Chunk size", value=DEFAULT_CHUNK_SIZE, precision=0)
                obfuscate = gr.Checkbox(label="Obfuscate EML From/To fields", value=True)
                run_button = gr.Button("Run pipeline", variant="primary")
            with gr.Column(scale=1):
                run_summary = gr.Code(label="Run summary", language="json")
                manifest_preview = gr.Code(label="Manifest preview", language="json")

        with gr.Row():
            results_table = gr.Dataframe(
                headers=["source_name", "relative_path", "extension", "chunk_count", "docling_format", "output_directory"],
                datatype=["str", "str", "str", "number", "str", "str"],
                row_count=1,
                col_count=(6, "fixed"),
                interactive=False,
                label="Processed documents",
            )

        document_selector = gr.Dropdown(label="Inspect a processed document", choices=[], value=None)

        with gr.Row():
            document_summary = gr.Code(label="Structured summary", language="json")
            chunk_preview = gr.Code(label="Chunk preview (first 5 chunks)", language="json")

        text_preview = gr.Textbox(label="Document text preview", lines=18, max_lines=18)

        run_button.click(
            fn=_run_pipeline,
            inputs=[input_dir, output_dir, chunk_size, obfuscate],
            outputs=[
                run_summary,
                manifest_preview,
                results_table,
                document_selector,
                document_summary,
                chunk_preview,
                text_preview,
                run_state,
            ],
        )

        document_selector.change(
            fn=_show_document_preview,
            inputs=[document_selector, run_state],
            outputs=[document_summary, chunk_preview, text_preview],
        )

    return demo

def _get_ui_host() -> str:
    host = os.getenv("UI_HOST", DEFAULT_HOST).strip()
    return host or DEFAULT_HOST

def _get_ui_port() -> int:
    raw_port = os.getenv("UI_PORT", str(DEFAULT_PORT)).strip()
    if not raw_port:
        return DEFAULT_PORT
    try:
        port = int(raw_port)
    except ValueError as exc:
        raise ValueError(f"Invalid UI_PORT value: {raw_port}") from exc
    if port <= 0 or port > 65535:
        raise ValueError(f"UI_PORT must be between 1 and 65535, got: {port}")
    return port

def launch_ui(host: str | None = None, port: int | None = None) -> None:
    resolved_host = host if host is not None else _get_ui_host()
    resolved_port = port if port is not None else _get_ui_port()
    demo = build_interface()
    demo.launch(server_name=resolved_host, server_port=resolved_port)

if __name__ == "__main__":
    launch_ui()
Enter fullscreen mode Exit fullscreen mode

input/ recursive scan⟶DoclingStructuredOutputApp⟶Format Filtering (.epub / .eml)⟶Docling Engine Conversion…

The pipeline operates via an orchestrated runtime cycle:

  1. Recursive Discovery: The framework crawls the target input path, isolating supported extensions (.epub, .eml) dynamically.
  2. Docling Normalization: The engine invokes DocumentConverter() to normalize files into a universal document representation, immediately enabling clean exports across three formats: markdown (document.md), raw text (document.txt), and full structural schema (document.json).
  3. Metadata Mapping: Custom extractors parse internal EPUB XML package elements and EML MIME headers.
  4. Downstream-Ready Chunking: Extracted text streams are split into token-optimized sections bounded by a configurable character threshold (Cmax​).

The PII Sanitization Engine: Safe EML Ingestion

Deploying communication data into shared LLM infrastructure introduces compliance risks regarding Personally Identifiable Information (PII). To mitigate this threat without breaking semantic link continuity, Bob implemented an automated contact obfuscation system within the email pipeline.

When processing .eml documents, the pipeline evaluates incoming routing vectors (From and To fields). If obfuscation is activated, the application passes identities through a stateful mapping filter, replacing real addresses and human names with stable, type-specific identifiers.

# Implementation snippet from Bob's EmailAddressObfuscator
class EmailAddressObfuscator:
    def __init__(self) -> None:
        self.entity_map: dict[str, str] = {}
        self.counters: dict[str, int] = {"person": 0, "email": 0}

    def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
        return [
            {
                "name": self._obfuscate_value(name, "person") if name else "",
                "address": self._obfuscate_value(address, "email") if address else "",
            }
            for name, address in contacts
        ]    def obfuscate_contacts(self, contacts: list[tuple[str, str]]) -> list[dict[str, str]]:
        return [
            {
                "name": self._obfuscate_value(name, "person") if name else "",
                "address": self._obfuscate_value(address, "email") if address else "",
            }
            for name, address in contacts
        ]
Enter fullscreen mode Exit fullscreen mode

This masking logic guarantees that identity strings remain consistent across a single processing batch. For example, if john.doe@corporate.com appears multiple times across separate communication nodes within the batch, it is mapped to a fixed identifier (e.g., email-1). This retains the transactional relationships needed for deep contextual understanding while ensuring actual sensitive data never hits global staging databases.

Granular Token-Aware Text Chunking

To feed large language models efficiently, documents must be split into chunks that fit within model window constraints. Bob’s design provides custom mathematical paragraph aggregation that prioritizes layout hierarchies over blind character counts. Section titles are extracted via structured markdown headers (e.g., lines starting with #) or detected typographic attributes. The buffer size tracks chunk lengths dynamically:

Where Pi​ represents sequential structural paragraphs and Cmax​ represents the maximum character ceiling constraint. This ensures chunks split cleanly at natural document breaks, keeping section headings attached to their text blocks to maximize vector search retrieval performance.

Consolidated Reporting and Analytics

Upon completing a data extraction run, the pipeline generates run-level summaries to ensure tracking and auditable data flow across automated AI systems:

  • manifest.json: A complete execution log tracking global timestamps, paths, flags, and an embedded array containing full document properties.
  • summary.csv: A high-level tabular matrix mapping source document formats to final analytical targets.
  • chunks.csv: A flat database containing every generated text block alongside its estimated token length, parent headings, and source metadata, allowing direct bulk insertion into vector indexing engines.

Through this modular architecture, Bob has transformed a complex format issue into an automated, highly reliable pipeline. Enterprise data teams can now safely extend their knowledge pipelines to process EPUB files and EML archives, keeping data secure and fully optimized for advanced generative AI applications.


Conclusion

Through this industrialized implementation, Bob has successfully delivered a robust, production-grade pipeline that unlocks the hidden knowledge within complex EPUB volumes and raw .eml archives. By wrapping the powerful multi-format ingestion of the Docling engine into both a flexible CLI automation script and an intuitive, interactive Gradio application, this project bridges the critical gap between raw enterprise data and downstream AI applications. The realization of advanced, token-aware document chunking combined with stateful, privacy-first PII obfuscation proves that data ingestion can be both highly intelligent and strictly compliant. Ultimately, this work provides a scalable blueprint for modern RAG and agentic workflows, transforming fragmented corporate communications and literature into highly structured, embedding-ready, and secure analytical assets.

>>> Thanks for reading <<<

Links

Top comments (0)