DEV Community

Alain Airom
Alain Airom

Posted on

Scaling Document Parsing: Bob’s Guide to Docling in Code Engine Serverless Fleets

Scaling on-demand usage of Docling mass document processing in Code Engine’s Serverless Fleets, implemented by IBM Bob.

A little background: What is Code Engine Serverless Fleets?


IBM Code Engine Serverless Fleets is a strategic container platform designed to simplify and optimize large-scale, parallel computation. It allows users to run compute-intensive workloads efficiently by managing a collection of worker nodes that automatically scale based on the specific resource requirements of the task. Whether you are processing millions of documents or running complex simulations, Fleets provides a fully managed environment where you only pay for the resources you consume, with no costs for idle time.

Key features of Serverless Fleets include:

  • Dynamic Scaling: Automatically scales from a single task to millions, deprovisioning workers immediately upon completion to ensure cost-efficiency.
  • GPU Acceleration: Supports heavy-duty processing with on-demand GPU-enabled instances (such as NVIDIA L40s or H100s) for high-performance workloads.
  • Flexible Resource Allocation: Offers no limits on vCPU, memory, or task duration, supporting seamless integration with your existing VPC network and data stores.
  • Integrated Workflow: Seamlessly connects with Cloud Object Storage (COS) for task state management and data input/output, while providing built-in logging via IBM Cloud Logs.

Docling and Code Engine Serverless Fleets: Scaling Document Intelligence with Docling and Serverless Fleets


Integrating Docling with IBM Code Engine Serverless Fleets creates a powerful, automated pipeline for converting complex documents like PDFs into structured Markdown at scale. This architecture utilizes Cloud Object Storage (COS) as a centralized hub, where input documents are stored and resulting files are automatically saved. By leveraging the official Docling container, the system autonomously spawns a fleet of worker nodes to process conversion tasks in parallel, ensuring high efficiency and performance. Users can optimize throughput by selecting specific worker profiles — such as the mx3d-24x240 — to run multiple Docling threads simultaneously or by deploying Serverless GPUs (like the NVIDIA L40s) for accelerated processing. This setup is ideal for high-volume document workflows, providing a cost-effective environment where compute resources scale dynamically and are deprovisioned immediately after the tasks are completed.


Orchestrating Global Scale: Using Bob’s Blueprint for Scalable Docling via Serverless Fleets


The core of this implementation is DoclingGPU, a production-ready application that integrates IBM’s Docling library with IBM Cloud Code Engine Serverless Fleets. This architecture allows for a cost-efficient pipeline where on-demand GPU or CPU worker pools scale to zero when idle and scale up dynamically to process large batches of documents in parallel. Key features include a web UI for drag-and-drop folder processing and a containerized architecture that ensures consistency across local and cloud environments. By automating the lifecycle of fleet workers, the application can process 1,000 PDFs in approximately two hours for roughly $3.20 using L40s GPUs.

| Profile            | GPU                 | vCPU | RAM   | Use Case               |
| ------------------ | ------------------- | ---- | ----- | ---------------------- |
| `gx3-24x120x1l40s` | 1× NVIDIA L40s 48GB | 24   | 120GB | Standard (recommended) |
| `gx3-48x240x2l40s` | 2× NVIDIA L40s      | 48   | 240GB | Large batches          |
| `gx3-96x480x4l40s` | 4× NVIDIA L40s      | 96   | 480GB | Maximum throughput     |
Enter fullscreen mode Exit fullscreen mode

The system architecture is built on three core principles: cost efficiency, scalability, and flexibility. The application supports three distinct processing modes — Local, Fleet CPU, and Fleet GPU — allowing the same code to run seamlessly on a laptop or a massive cloud-based GPU cluster. The data flow is orchestrated through Cloud Object Storage (COS), where fleet workers pull input documents and push structured Markdown results. To ensure enterprise-grade security, the design incorporates IBM Cloud IAM for authentication, encrypted secrets injected as environment variables, and TLS encryption for all data in transit.

| Mode          | Description                      | Best For               |
| ------------- | -------------------------------- | ---------------------- |
| **Local**     | Docling runs on the Flask server | Dev, small batches     |
| **Fleet CPU** | Code Engine CPU worker pool      | 10–100 documents       |
| **Fleet GPU** | Code Engine GPU pool (L40s/H100) | 100+ complex documents |
Enter fullscreen mode Exit fullscreen mode

Deploying the solution to production is streamlined through I*BM Cloud Code Engine* and Terraform for Infrastructure-as-Code (IaC). The deployment process involves setting up a container registry, configuring Code Engine projects, and defining persistent data stores that link the fleet to COS buckets. A critical part of the setup is the Serverless Fleet configuration, which defines worker profiles (such as gx3-24x120x1l40s for GPUs) and scaling limits to control costs. This production-ready setup also includes health check endpoints and integrated logging via IBM Cloud Logs to monitor high-volume conversion jobs.

For local development and testing, the application can be run using a Docker-based setup or a standard Python 3.11 virtual environment. This allows developers to verify document parsing logic locally on their CPU or a local NVIDIA GPU before pushing to the cloud. The local workflow includes tools for document-specific troubleshooting, such as adjusting batch sizes to prevent out-of-memory errors on smaller machines. Typical processing times for a complex 50-page PDF drop from 3 minutes on a local CPU to just 15 seconds when using a cloud-based GPU, highlighting the value of the fleet-ready implementation.

The application provides a comprehensive REST API to facilitate integration with existing enterprise workflows. Developers can programmatically submit jobs via the /upload and /local/process endpoints, which support both single-file and batch folder processing. The API allows for real-time monitoring of job status (e.g., pending, processing, completed) and provides structured JSON responses containing download URLs for the processed Markdown files. Future-proofing the implementation, the API is designed to support webhooks for asynchronous notifications once large-scale fleet jobs are finalized.


Technical Requirements and Deployment: What is Needed to Run the Application

| Feature                | Description                                                  |
| ---------------------- | ------------------------------------------------------------ |
| 🌐 **Web UI**           | Drag-and-drop upload, folder batch processing, real-time job monitoring |
| 🖥️ **Local Mode**       | Run Docling directly on your machine (CPU or GPU)            |
| ⚡ **Fleet CPU Mode**   | Launch Code Engine CPU fleet for moderate workloads          |
| 🚀 **Fleet GPU Mode**   | Launch Code Engine GPU fleet (NVIDIA L40s/H100) for large workloads |
| 📄 **Output Formats**   | Timestamped Markdown files with YAML frontmatter metadata    |
| 📦 **Batch Processing** | Process entire folders of documents in parallel              |
| 🔄 **Auto-scaling**     | Workers scale to zero when idle, scale up on demand          |
| 🐳 **Containerized**    | Docker + Docker Compose for local development                |
| ☸️ **K8s Ready**        | Kubernetes manifests for non-Code Engine deployments         |
| 🏗️ **IaC**              | Terraform scripts for full IBM Cloud infrastructure provisioning |
Enter fullscreen mode Exit fullscreen mode
codeengine-docling-gpu/
├── app/                          # Flask web application
│   ├── app.py                    # Main Flask application
│   ├── templates/
│   │   └── index.html            # Single-page UI
│   ├── static/
│   │   ├── css/style.css         # IBM Design System-inspired styles
│   │   └── js/app.js             # Frontend JavaScript
│   ├── requirements.txt          # Python dependencies
│   └── Dockerfile                # Web app container
│
├── worker/                       # Docling processing worker
│   ├── worker.py                 # Standalone document processor
│   ├── Dockerfile.gpu            # GPU-enabled worker container
│   ├── entrypoint.sh             # Container entrypoint
│   └── requirements.txt          # Worker dependencies
│
├── Scripts/                      # Automation scripts
│   ├── deploy-codeengine.sh      # IBM Cloud Code Engine deployment
│   ├── build-and-push.sh         # Docker build & push
│   ├── local-run.sh              # Local development runner
│   ├── push-to-github.sh         # GitHub push (excludes _* folders)
│   └── deploy-terraform/         # Terraform IaC
│       ├── main.tf               # IBM Cloud resources
│       ├── variables.tf          # Variable declarations
│       ├── terraform.tfvars.example  # Example configuration
│       └── deploy.sh             # Terraform wrapper script
│
├── k8s/                          # Kubernetes manifests
│   └── deployment.yaml           # Deployment, Service, Ingress, HPA
│
├── Docs/                         # Documentation
│   ├── README.md                 # This file
│   ├── ARCHITECTURE.md           # Architecture diagrams (Mermaid)
│   ├── DEPLOYMENT.md             # Deployment guide
│   ├── LOCAL_DEVELOPMENT.md      # Local development guide
│   └── API.md                    # REST API reference
│
├── input/                        # Local input documents
├── output/                       # Local output (timestamped Markdown)
├── docker-compose.yml            # Multi-service local development
├── .gitignore                    # Git ignore (excludes _* folders)
└── README.md                     # Root project README
Enter fullscreen mode Exit fullscreen mode

Local Deployment

For localized testing and development, the application is designed to run in a standalone environment. A critical dependency is the docling-serve service, which the main application consumes via REST API to perform document parsing without requiring an immediate cloud connection.

> Docling-serve runs Docling as a service!

Cloud Deployment(s)

The implementation supports two primary deployment tiers: a streamlined shell script for immediate setup and a comprehensive Terraform configuration for enterprise-grade environments. The Terraform approach is recommended for long-term management, as it ensures consistent provisioning of the entire stack — including the web application, the Docling worker images, and the required HMAC keys for secure data flow between the fleet and IBM Cloud Object Storage.

  • Shell mode
#!/bin/bash
# ============================================================
# DoclingGPU - IBM Cloud Code Engine Deployment Script
# Deploys the web application as a serverless Code Engine app
# and configures the fleet infrastructure.
# ============================================================

set -euo pipefail

# ============================================================
# Configuration - Edit these or set as environment variables
# ============================================================
CE_REGION="${CE_REGION:-eu-de}"
CE_RESOURCE_GROUP="${CE_RESOURCE_GROUP:-default}"
CE_PROJECT_NAME="${CE_PROJECT_NAME:-doclinggpu-project}"
CE_APP_NAME="${CE_APP_NAME:-doclinggpu-webapp}"
CE_REGISTRY="${CE_REGISTRY:-}"                          # e.g. de.icr.io
CE_REGISTRY_NAMESPACE="${CE_REGISTRY_NAMESPACE:-}"      # e.g. my-namespace
CE_REGISTRY_SECRET="${CE_REGISTRY_SECRET:-ce-auto-icr-private-eu-de}"
COS_INSTANCE_NAME="${COS_INSTANCE_NAME:-doclinggpu-cos}"
COS_INPUT_BUCKET="${COS_INPUT_BUCKET:-doclinggpu-input}"
COS_OUTPUT_BUCKET="${COS_OUTPUT_BUCKET:-doclinggpu-output}"
APP_IMAGE_TAG="${APP_IMAGE_TAG:-latest}"
SECRET_KEY="${SECRET_KEY:-$(openssl rand -hex 32)}"

# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
NC='\033[0m' # No Color

# ============================================================
# Helper functions
# ============================================================
log_info()    { echo -e "${BLUE}[INFO]${NC} $*"; }
log_success() { echo -e "${GREEN}[OK]${NC} $*"; }
log_warn()    { echo -e "${YELLOW}[WARN]${NC} $*"; }
log_error()   { echo -e "${RED}[ERROR]${NC} $*"; exit 1; }
log_step()    { echo -e "\n${CYAN}━━━ $* ━━━${NC}"; }

check_prereqs() {
    log_step "Checking prerequisites"

    local missing=0
    for cmd in ibmcloud docker; do
        if ! command -v "$cmd" &>/dev/null; then
            log_error "Required command not found: $cmd"
            missing=1
        fi
    done

    # Check ibmcloud plugins
    for plugin in code-engine container-registry; do
        if ! ibmcloud plugin list | grep -q "$plugin"; then
            log_warn "Installing ibmcloud plugin: $plugin"
            ibmcloud plugin install "$plugin" -f
        fi
    done

    [[ $missing -eq 0 ]] && log_success "All prerequisites met"
}

login_ibmcloud() {
    log_step "IBM Cloud Login"

    if [[ -n "${IBMCLOUD_API_KEY:-}" ]]; then
        ibmcloud login --apikey "$IBMCLOUD_API_KEY" -r "$CE_REGION" -g "$CE_RESOURCE_GROUP" -q
        log_success "Logged in with API key"
    else
        log_warn "IBMCLOUD_API_KEY not set. Attempting interactive login..."
        ibmcloud login -r "$CE_REGION" -g "$CE_RESOURCE_GROUP"
    fi

    ibmcloud target -r "$CE_REGION" -g "$CE_RESOURCE_GROUP"
}

setup_registry() {
    log_step "Container Registry Setup"

    if [[ -z "$CE_REGISTRY" ]]; then
        CE_REGISTRY="private.${CE_REGION}.icr.io"
        log_info "Using registry: $CE_REGISTRY"
    fi

    if [[ -z "$CE_REGISTRY_NAMESPACE" ]]; then
        CE_REGISTRY_NAMESPACE="doclinggpu-$(openssl rand -hex 4)"
        log_info "Creating registry namespace: $CE_REGISTRY_NAMESPACE"
        ibmcloud cr namespace-add "$CE_REGISTRY_NAMESPACE" || true
    fi

    # Login to container registry
    ibmcloud cr login
    log_success "Registry configured: ${CE_REGISTRY}/${CE_REGISTRY_NAMESPACE}"
}

build_and_push_image() {
    log_step "Building and Pushing Web App Image"

    local image="${CE_REGISTRY}/${CE_REGISTRY_NAMESPACE}/${CE_APP_NAME}:${APP_IMAGE_TAG}"
    local build_date
    build_date=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    log_info "Building image: $image"
    docker build \
        --build-arg BUILD_DATE="$build_date" \
        --build-arg VERSION="$APP_IMAGE_TAG" \
        -t "$image" \
        ./app/

    log_info "Pushing image: $image"
    docker push "$image"

    export APP_IMAGE="$image"
    log_success "Image pushed: $image"
}

setup_cos() {
    log_step "Cloud Object Storage Setup"

    # Check if COS instance exists
    if ! ibmcloud resource service-instance "$COS_INSTANCE_NAME" &>/dev/null; then
        log_info "Creating COS instance: $COS_INSTANCE_NAME"
        ibmcloud resource service-instance-create \
            "$COS_INSTANCE_NAME" \
            cloud-object-storage \
            standard \
            global
    else
        log_info "COS instance already exists: $COS_INSTANCE_NAME"
    fi

    # Get COS CRN
    COS_CRN=$(ibmcloud resource service-instance "$COS_INSTANCE_NAME" --output json | \
              python3 -c "import sys,json; print(json.load(sys.stdin)[0]['crn'])")

    # Create buckets
    for bucket in "$COS_INPUT_BUCKET" "$COS_OUTPUT_BUCKET"; do
        log_info "Creating bucket: $bucket"
        ibmcloud cos bucket-create \
            --bucket "$bucket" \
            --ibm-service-instance-id "$COS_CRN" \
            --region "$CE_REGION" 2>/dev/null || log_warn "Bucket may already exist: $bucket"
    done

    log_success "COS configured"
}

setup_ce_project() {
    log_step "Code Engine Project Setup"

    # Create or select project
    if ! ibmcloud ce project get --name "$CE_PROJECT_NAME" &>/dev/null; then
        log_info "Creating Code Engine project: $CE_PROJECT_NAME"
        ibmcloud ce project create --name "$CE_PROJECT_NAME"
    fi

    ibmcloud ce project select --name "$CE_PROJECT_NAME"
    log_success "Project selected: $CE_PROJECT_NAME"
}

setup_ce_secrets() {
    log_step "Code Engine Secrets & Config"

    # Create registry secret if not exists
    if ! ibmcloud ce secret get --name "$CE_REGISTRY_SECRET" &>/dev/null; then
        log_info "Creating registry secret: $CE_REGISTRY_SECRET"
        ibmcloud ce secret create \
            --name "$CE_REGISTRY_SECRET" \
            --format registry \
            --server "$CE_REGISTRY" \
            --username iamapikey \
            --password "$IBMCLOUD_API_KEY"
    fi

    # Create app secrets
    ibmcloud ce secret create \
        --name doclinggpu-secrets \
        --from-literal SECRET_KEY="$SECRET_KEY" \
        --from-literal CE_REGION="$CE_REGION" \
        --from-literal CE_REGISTRY_SECRET="$CE_REGISTRY_SECRET" \
        2>/dev/null || \
    ibmcloud ce secret update \
        --name doclinggpu-secrets \
        --from-literal SECRET_KEY="$SECRET_KEY" \
        --from-literal CE_REGION="$CE_REGION"

    log_success "Secrets configured"
}

deploy_webapp() {
    log_step "Deploying Web Application to Code Engine"

    local image="${APP_IMAGE:-${CE_REGISTRY}/${CE_REGISTRY_NAMESPACE}/${CE_APP_NAME}:${APP_IMAGE_TAG}}"

    # Check if app exists
    if ibmcloud ce app get --name "$CE_APP_NAME" &>/dev/null; then
        log_info "Updating existing app: $CE_APP_NAME"
        ibmcloud ce app update \
            --name "$CE_APP_NAME" \
            --image "$image" \
            --registry-secret "$CE_REGISTRY_SECRET" \
            --env PROCESSING_MODE=fleet-gpu \
            --env CE_REGION="$CE_REGION" \
            --env CE_INPUT_STORE=fleet-input-store \
            --env CE_OUTPUT_STORE=fleet-output-store \
            --env CE_FLEET_TASK_STORE=fleet-task-store \
            --env CE_FLEET_SUBNETPOOL=fleet-subnetpool \
            --env CE_REGISTRY_SECRET="$CE_REGISTRY_SECRET" \
            --env-from-secret doclinggpu-secrets \
            --cpu 1 \
            --memory 4G \
            --min-scale 0 \
            --max-scale 5 \
            --port 8080 \
            --timeout 300
    else
        log_info "Creating new app: $CE_APP_NAME"
        ibmcloud ce app create \
            --name "$CE_APP_NAME" \
            --image "$image" \
            --registry-secret "$CE_REGISTRY_SECRET" \
            --env PROCESSING_MODE=fleet-gpu \
            --env CE_REGION="$CE_REGION" \
            --env CE_INPUT_STORE=fleet-input-store \
            --env CE_OUTPUT_STORE=fleet-output-store \
            --env CE_FLEET_TASK_STORE=fleet-task-store \
            --env CE_FLEET_SUBNETPOOL=fleet-subnetpool \
            --env CE_REGISTRY_SECRET="$CE_REGISTRY_SECRET" \
            --env-from-secret doclinggpu-secrets \
            --cpu 1 \
            --memory 4G \
            --min-scale 0 \
            --max-scale 5 \
            --port 8080 \
            --timeout 300
    fi

    # Get app URL
    APP_URL=$(ibmcloud ce app get --name "$CE_APP_NAME" --output json | \
              python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('status',{}).get('url',''))")

    log_success "App deployed: $APP_URL"
    echo ""
    echo -e "${GREEN}╔══════════════════════════════════════════════════╗${NC}"
    echo -e "${GREEN}║  DoclingGPU deployed successfully!               ║${NC}"
    echo -e "${GREEN}║  URL: ${APP_URL}${NC}"
    echo -e "${GREEN}╚══════════════════════════════════════════════════╝${NC}"
}

print_summary() {
    echo ""
    log_step "Deployment Summary"
    echo "  Region:          $CE_REGION"
    echo "  Project:         $CE_PROJECT_NAME"
    echo "  App:             $CE_APP_NAME"
    echo "  Registry:        ${CE_REGISTRY}/${CE_REGISTRY_NAMESPACE}"
    echo "  COS Input:       $COS_INPUT_BUCKET"
    echo "  COS Output:      $COS_OUTPUT_BUCKET"
    echo ""
    echo "Next steps:"
    echo "  1. Upload documents to COS: ibmcloud cos object-put --bucket $COS_INPUT_BUCKET --key pdfs/doc.pdf --body ./input/doc.pdf"
    echo "  2. Open the web app and start processing"
    echo "  3. Monitor fleets: ibmcloud ce fleet list"
    echo ""
}

# ============================================================
# Main
# ============================================================
main() {
    echo ""
    echo -e "${CYAN}╔══════════════════════════════════════════════════╗${NC}"
    echo -e "${CYAN}║  DoclingGPU - IBM Code Engine Deployment         ║${NC}"
    echo -e "${CYAN}╚══════════════════════════════════════════════════╝${NC}"
    echo ""

    check_prereqs
    login_ibmcloud
    setup_registry
    build_and_push_image
    setup_cos
    setup_ce_project
    setup_ce_secrets
    deploy_webapp
    print_summary
}

# Parse arguments
SKIP_BUILD=false
SKIP_COS=false

while [[ $# -gt 0 ]]; do
    case $1 in
        --skip-build)   SKIP_BUILD=true; shift ;;
        --skip-cos)     SKIP_COS=true; shift ;;
        --region)       CE_REGION="$2"; shift 2 ;;
        --project)      CE_PROJECT_NAME="$2"; shift 2 ;;
        --app-name)     CE_APP_NAME="$2"; shift 2 ;;
        --help|-h)
            echo "Usage: $0 [OPTIONS]"
            echo "Options:"
            echo "  --skip-build    Skip Docker build and push"
            echo "  --skip-cos      Skip COS bucket creation"
            echo "  --region        IBM Cloud region (default: eu-de)"
            echo "  --project       Code Engine project name"
            echo "  --app-name      Code Engine app name"
            exit 0
            ;;
        *) log_error "Unknown option: $1" ;;
    esac
done

main

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • IaC mode (to be tuned and adapted)
# ============================================================
# DoclingGPU - Terraform Deployment
# IBM Cloud Code Engine + COS Infrastructure
# ============================================================

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    ibm = {
      source  = "IBM-Cloud/ibm"
      version = ">= 1.65.0"
    }
  }

  # Uncomment to use IBM Cloud Schematics as backend
  # backend "http" {}
}

# ============================================================
# Provider
# ============================================================
provider "ibm" {
  ibmcloud_api_key = var.ibmcloud_api_key
  region           = var.region
}

# ============================================================
# Data Sources
# ============================================================
data "ibm_resource_group" "rg" {
  name = var.resource_group
}

# ============================================================
# Cloud Object Storage
# ============================================================
resource "ibm_resource_instance" "cos" {
  name              = "${var.prefix}-cos"
  resource_group_id = data.ibm_resource_group.rg.id
  service           = "cloud-object-storage"
  plan              = "standard"
  location          = "global"

  tags = var.tags
}

resource "ibm_cos_bucket" "input" {
  bucket_name          = "${var.prefix}-input-${random_id.suffix.hex}"
  resource_instance_id = ibm_resource_instance.cos.id
  region_location      = var.region
  storage_class        = "smart"

  lifecycle {
    prevent_destroy = false
  }
}

resource "ibm_cos_bucket" "output" {
  bucket_name          = "${var.prefix}-output-${random_id.suffix.hex}"
  resource_instance_id = ibm_resource_instance.cos.id
  region_location      = var.region
  storage_class        = "smart"

  lifecycle {
    prevent_destroy = false
  }
}

resource "random_id" "suffix" {
  byte_length = 4
}

# ============================================================
# Code Engine Project
# ============================================================
resource "ibm_code_engine_project" "main" {
  name              = "${var.prefix}-project"
  resource_group_id = data.ibm_resource_group.rg.id
}

# ============================================================
# Code Engine Secrets
# ============================================================
resource "ibm_code_engine_secret" "app_secrets" {
  project_id = ibm_code_engine_project.main.project_id
  name       = "doclinggpu-secrets"
  format     = "generic"

  data = {
    SECRET_KEY      = var.app_secret_key
    CE_REGION       = var.region
    COS_INPUT_BUCKET  = ibm_cos_bucket.input.bucket_name
    COS_OUTPUT_BUCKET = ibm_cos_bucket.output.bucket_name
  }
}

resource "ibm_code_engine_secret" "registry" {
  project_id = ibm_code_engine_project.main.project_id
  name       = "doclinggpu-registry"
  format     = "registry"

  data = {
    server   = var.registry_server
    username = "iamapikey"
    password = var.ibmcloud_api_key
    email    = var.registry_email
  }
}

# ============================================================
# Code Engine Application (Web App)
# ============================================================
resource "ibm_code_engine_app" "webapp" {
  project_id = ibm_code_engine_project.main.project_id
  name       = "${var.prefix}-webapp"

  image_reference = var.webapp_image
  image_secret    = ibm_code_engine_secret.registry.name

  scale_min_instances = 0
  scale_max_instances = var.webapp_max_instances
  scale_cpu_limit     = "1"
  scale_memory_limit  = "4G"
  scale_request_timeout = 300

  run_env_variables {
    type  = "literal"
    name  = "PROCESSING_MODE"
    value = "fleet-gpu"
  }

  run_env_variables {
    type  = "literal"
    name  = "CE_REGION"
    value = var.region
  }

  run_env_variables {
    type  = "literal"
    name  = "CE_INPUT_STORE"
    value = "fleet-input-store"
  }

  run_env_variables {
    type  = "literal"
    name  = "CE_OUTPUT_STORE"
    value = "fleet-output-store"
  }

  run_env_variables {
    type  = "literal"
    name  = "CE_FLEET_TASK_STORE"
    value = "fleet-task-store"
  }

  run_env_variables {
    type  = "literal"
    name  = "CE_FLEET_SUBNETPOOL"
    value = "fleet-subnetpool"
  }

  run_env_variables {
    type       = "secret_key_ref"
    name       = "SECRET_KEY"
    reference  = ibm_code_engine_secret.app_secrets.name
    key        = "SECRET_KEY"
  }

  run_env_variables {
    type       = "secret_key_ref"
    name       = "COS_INPUT_BUCKET"
    reference  = ibm_code_engine_secret.app_secrets.name
    key        = "COS_INPUT_BUCKET"
  }

  run_env_variables {
    type       = "secret_key_ref"
    name       = "COS_OUTPUT_BUCKET"
    reference  = ibm_code_engine_secret.app_secrets.name
    key        = "COS_OUTPUT_BUCKET"
  }

  depends_on = [
    ibm_code_engine_secret.app_secrets,
    ibm_code_engine_secret.registry
  ]
}

# ============================================================
# Outputs
# ============================================================
output "project_id" {
  description = "Code Engine project ID"
  value       = ibm_code_engine_project.main.project_id
}

output "webapp_url" {
  description = "Web application URL"
  value       = ibm_code_engine_app.webapp.endpoint
}

output "cos_input_bucket" {
  description = "COS input bucket name"
  value       = ibm_cos_bucket.input.bucket_name
}

output "cos_output_bucket" {
  description = "COS output bucket name"
  value       = ibm_cos_bucket.output.bucket_name
}

output "cos_instance_id" {
  description = "COS instance CRN"
  value       = ibm_resource_instance.cos.crn
}
Enter fullscreen mode Exit fullscreen mode

Elevating the User Experience: A Command Center for Document Intelligence

The application is designed to be highly intuitive and self-explanatory, providing a seamless bridge between complex serverless infrastructure and the end user. Rather than requiring manual CLI commands for every task, the interface allows users to perform drag-and-drop uploads and initiate batch folder processing with just a few clicks.

Key UI features that simplify the user journey include:

  • Mode Selection: A clear toggle allows users to switch between Local, Fleet CPU, and Fleet GPU processing modes depending on the workload intensity.

  • Real-Time Monitoring: The dashboard provides live updates on the job lifecycle, moving from pending to processing and finally completed.
  • Results Management: Once conversion is finished, the UI generates structured Markdown files with YAML frontmatter metadata, which are immediately available for download via secure URLs.


This design ensures that even users without deep cloud expertise can harness the power of NVIDIA L40s or H100 GPUs to convert thousands of documents into LLM-ready data without ever leaving their browser.


In the DoclingGPU architecture, the system is split into two distinct components: the Main Application (app.py) and the Worker (worker.py). While they work together to process documents, they serve completely different roles in the serverless ecosystem.

Primary Responsibility and Role

  • Main Application (app.py): Acts as the Control Plane and user-facing gateway. It manages the web interface, handles REST API requests, and orchestrates the lifecycle of document processing jobs.
  • Worker (worker.py): Acts as the Data Plane or execution engine. Its sole purpose is to perform the "heavy lifting"—the actual conversion of documents from various formats into structured Markdown using the Docling library.

Execution Environment and Lifecycle

  • Main Application: Typically runs as a persistent or auto-scaling Code Engine Application. It stays active to listen for user uploads and monitor job statuses.
  • Worker: Designed to run as a Serverless Fleet task. Workers are ephemeral; they are spun up on-demand to handle a specific batch of files and are deprovisioned immediately after the task is finished to minimize costs.

Processing Modes and Hardware

  • Main Application: Generally runs on standard CPU profiles because its tasks (routing, API management, and UI rendering) are not computationally intensive.
  • Worker: Can be configured for Fleet CPU or Fleet GPU modes. In high-volume scenarios, workers utilize powerful hardware like NVIDIA L40s or H100 GPUs to accelerate document AI tasks.

Data Handling and Communication

  • Main Application: Handles initial file uploads to temporary storage and interacts with the Code Engine API to trigger worker fleets. It provides the user with final download URLs for processed results.
  • Worker: Interacts directly with Cloud Object Storage (COS). It pulls input files from a mounted /input directory and writes completed Markdown files to a mounted /output directory. It communicates its progress back to the system via task metadata and exit codes.

Technical Stack and Dependencies

  • Main Application: Built using the Flask web framework and uses libraries like requests for internal service communication and subprocess to trigger CLI-based cloud commands.
  • Worker: A standalone script that depends heavily on the Docling library and PyTorch (for GPU acceleration). It includes specific logic for auto-detecting hardware (CUDA for NVIDIA GPUs) and managing multi-threaded document parsing.


# app.py
"""
DoclingGPU - IBM Code Engine Serverless Document Processing Application
Combines IBM Code Engine Serverless Fleets with Docling for GPU-accelerated document conversion.
"""

import os
import json
import uuid
import time
import subprocess
import threading
import logging
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template, request, jsonify, send_file, redirect, url_for, flash
from werkzeug.utils import secure_filename
import requests

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'docling-gpu-secret-key-change-in-prod')

# Configuration
UPLOAD_FOLDER = os.getenv('UPLOAD_FOLDER', '/tmp/uploads')
OUTPUT_FOLDER = os.getenv('OUTPUT_FOLDER', '/tmp/outputs')
LOCAL_INPUT_FOLDER = os.getenv('LOCAL_INPUT_FOLDER', './input')
LOCAL_OUTPUT_FOLDER = os.getenv('LOCAL_OUTPUT_FOLDER', './output')
MAX_CONTENT_LENGTH = int(os.getenv('MAX_CONTENT_LENGTH', str(500 * 1024 * 1024)))  # 500MB default

# IBM Cloud Code Engine / Fleet configuration
CE_PROJECT_ID = os.getenv('CE_PROJECT_ID', '')
CE_REGION = os.getenv('CE_REGION', 'eu-de')
CE_FLEET_TASK_STORE = os.getenv('CE_FLEET_TASK_STORE', 'fleet-task-store')
CE_FLEET_SUBNETPOOL = os.getenv('CE_FLEET_SUBNETPOOL', 'fleet-subnetpool')
CE_INPUT_STORE = os.getenv('CE_INPUT_STORE', 'fleet-input-store')
CE_OUTPUT_STORE = os.getenv('CE_OUTPUT_STORE', 'fleet-output-store')
CE_REGISTRY_SECRET = os.getenv('CE_REGISTRY_SECRET', 'fleet-registry-secret')

# Docling container images
DOCLING_GPU_IMAGE = os.getenv('DOCLING_GPU_IMAGE', 'quay.io/docling-project/docling-serve')
DOCLING_CPU_IMAGE = os.getenv('DOCLING_CPU_IMAGE', 'quay.io/docling-project/docling-serve-cpu')

# Processing mode
PROCESSING_MODE = os.getenv('PROCESSING_MODE', 'local')  # 'local', 'fleet-cpu', 'fleet-gpu'

# In-memory job store (use Redis in production)
jobs = {}

app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH

# Ensure directories exist
for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER, LOCAL_INPUT_FOLDER, LOCAL_OUTPUT_FOLDER]:
    Path(folder).mkdir(parents=True, exist_ok=True)


def get_timestamp():
    """Get current timestamp for file naming."""
    return datetime.now().strftime('%Y%m%d_%H%M%S')


def create_job(job_id, files, output_dest, processing_mode, gpu_enabled=False):
    """Create a new processing job."""
    jobs[job_id] = {
        'id': job_id,
        'status': 'pending',
        'files': files,
        'output_dest': output_dest,
        'processing_mode': processing_mode,
        'gpu_enabled': gpu_enabled,
        'created_at': datetime.utcnow().isoformat(),
        'updated_at': datetime.utcnow().isoformat(),
        'progress': 0,
        'total_files': len(files),
        'processed_files': 0,
        'results': [],
        'errors': [],
        'fleet_id': None,
        'log': []
    }
    return jobs[job_id]


def update_job(job_id, **kwargs):
    """Update job status."""
    if job_id in jobs:
        jobs[job_id].update(kwargs)
        jobs[job_id]['updated_at'] = datetime.utcnow().isoformat()


def process_local(job_id, input_files, output_dir):
    """Process documents locally using Docling."""
    try:
        update_job(job_id, status='running', log=['Starting local Docling processing...'])

        from docling.document_converter import DocumentConverter, PdfFormatOption
        from docling.datamodel.base_models import InputFormat
        from docling.datamodel.pipeline_options import PdfPipelineOptions

        # Configure PDF pipeline options (OCR + table structure)
        pipeline_options = PdfPipelineOptions()
        pipeline_options.do_ocr = True
        pipeline_options.do_table_structure = True

        # Build converter accepting ALL Docling-supported formats
        converter = DocumentConverter(
            allowed_formats=[
                InputFormat.PDF,
                InputFormat.DOCX,
                InputFormat.PPTX,
                InputFormat.XLSX,
                InputFormat.HTML,
                InputFormat.MD,
                InputFormat.ASCIIDOC,
                InputFormat.CSV,
                InputFormat.IMAGE,
            ],
            format_options={
                InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
            }
        )

        results = []
        timestamp = get_timestamp()

        for i, input_file in enumerate(input_files):
            try:
                file_name = Path(input_file).stem
                output_file = Path(output_dir) / f"{timestamp}_{file_name}.md"

                log_msg = f"Processing [{i+1}/{len(input_files)}]: {Path(input_file).name}"
                logger.info(log_msg)
                update_job(job_id, log=jobs[job_id]['log'] + [log_msg])

                start_time = time.time()
                result = converter.convert(input_file)
                markdown_content = result.document.export_to_markdown()
                elapsed = time.time() - start_time

                # Write output with metadata header
                with open(output_file, 'w', encoding='utf-8') as f:
                    f.write(f"---\n")
                    f.write(f"source: {Path(input_file).name}\n")
                    f.write(f"processed_at: {datetime.utcnow().isoformat()}\n")
                    f.write(f"processing_time_seconds: {elapsed:.2f}\n")
                    f.write(f"processing_mode: local\n")
                    f.write(f"---\n\n")
                    f.write(markdown_content)

                results.append({
                    'input': str(input_file),
                    'output': str(output_file),
                    'status': 'success',
                    'processing_time': elapsed
                })

                processed = i + 1
                progress = int((processed / len(input_files)) * 100)
                update_job(job_id, 
                          processed_files=processed,
                          progress=progress,
                          results=results)

                success_msg = f"✓ Completed {Path(input_file).name} in {elapsed:.2f}s → {output_file.name}"
                update_job(job_id, log=jobs[job_id]['log'] + [success_msg])

            except Exception as e:
                error_msg = f"✗ Error processing {Path(input_file).name}: {str(e)}"
                logger.error(error_msg)
                update_job(job_id, 
                          errors=jobs[job_id]['errors'] + [error_msg],
                          log=jobs[job_id]['log'] + [error_msg])

        final_status = 'completed' if not jobs[job_id]['errors'] else 'completed_with_errors'
        update_job(job_id, status=final_status, progress=100)
        logger.info(f"Job {job_id} completed. {len(results)} files processed.")

    except ImportError as e:
        error_msg = f"Docling not installed: {str(e)}. Install with: pip install docling"
        logger.error(error_msg)
        update_job(job_id, status='failed', errors=[error_msg])
    except Exception as e:
        error_msg = f"Processing failed: {str(e)}"
        logger.error(error_msg)
        update_job(job_id, status='failed', errors=[error_msg])


def generate_commands_jsonl(input_files, output_dir, num_threads=12):
    """Generate commands.jsonl for Code Engine fleet tasks."""
    commands = []
    for input_file in input_files:
        file_name = Path(input_file).name
        output_name = f"docling_{file_name}.md"
        cmd = {
            "cmds": ["docling"],
            "args": [
                "--num-threads", str(num_threads),
                f"/input/pdfs/{file_name}",
                "--output", f"/output/{output_name}"
            ]
        }
        commands.append(json.dumps(cmd))
    return '\n'.join(commands)


def launch_fleet(job_id, commands_jsonl_path, gpu_enabled=False, max_scale=8):
    """Launch a Code Engine serverless fleet."""
    try:
        fleet_uuid = str(uuid.uuid4())[:8].lower()
        fleet_name = f"fleet-{fleet_uuid}-1"

        if gpu_enabled:
            image = DOCLING_GPU_IMAGE
            cmd = [
                'ibmcloud', 'code-engine', 'fleet', 'create',
                '--name', fleet_name,
                '--tasks-state-store', CE_FLEET_TASK_STORE,
                '--subnetpool-name', CE_FLEET_SUBNETPOOL,
                '--image', image,
                '--registry-secret', CE_REGISTRY_SECRET,
                '--max-scale', '1',
                '--tasks-from-local-file', commands_jsonl_path,
                '--gpu', 'l40s:1',
                '--mount-data-store', f'/input={CE_INPUT_STORE}:/docling',
                '--mount-data-store', f'/output={CE_OUTPUT_STORE}:/docling'
            ]
        else:
            image = DOCLING_CPU_IMAGE
            cmd = [
                'ibmcloud', 'code-engine', 'fleet', 'create',
                '--name', fleet_name,
                '--tasks-state-store', CE_FLEET_TASK_STORE,
                '--subnetpool-name', CE_FLEET_SUBNETPOOL,
                '--image', image,
                '--registry-secret', CE_REGISTRY_SECRET,
                '--worker-profile', 'mx3d-24x240',
                '--max-scale', str(max_scale),
                '--tasks-from-local-file', commands_jsonl_path,
                '--cpu', '12',
                '--memory', '120G',
                '--mount-data-store', f'/input={CE_INPUT_STORE}:/docling',
                '--mount-data-store', f'/output={CE_OUTPUT_STORE}:/docling'
            ]

        log_msg = f"Launching fleet: {fleet_name} ({'GPU' if gpu_enabled else 'CPU'})"
        logger.info(log_msg)
        update_job(job_id, 
                  status='fleet_launching',
                  fleet_id=fleet_name,
                  log=jobs[job_id]['log'] + [log_msg, f"Command: {' '.join(cmd)}"])

        result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)

        if result.returncode == 0:
            success_msg = f"Fleet launched successfully: {fleet_name}"
            logger.info(success_msg)
            update_job(job_id, 
                      status='fleet_running',
                      log=jobs[job_id]['log'] + [success_msg, result.stdout])
            return fleet_name
        else:
            error_msg = f"Fleet launch failed: {result.stderr}"
            logger.error(error_msg)
            update_job(job_id, 
                      status='failed',
                      errors=[error_msg],
                      log=jobs[job_id]['log'] + [error_msg])
            return None

    except subprocess.TimeoutExpired:
        error_msg = "Fleet launch timed out"
        update_job(job_id, status='failed', errors=[error_msg])
        return None
    except Exception as e:
        error_msg = f"Fleet launch error: {str(e)}"
        update_job(job_id, status='failed', errors=[error_msg])
        return None


def process_fleet(job_id, input_files, output_dir, gpu_enabled=False):
    """Process documents using Code Engine serverless fleet."""
    try:
        update_job(job_id, status='preparing', log=[f"Preparing fleet processing ({'GPU' if gpu_enabled else 'CPU'})..."])

        # Generate commands.jsonl
        commands_content = generate_commands_jsonl(input_files, output_dir)
        commands_file = Path(UPLOAD_FOLDER) / f"commands_{job_id}.jsonl"

        with open(commands_file, 'w') as f:
            f.write(commands_content)

        log_msg = f"Generated {len(input_files)} tasks in {commands_file}"
        update_job(job_id, log=jobs[job_id]['log'] + [log_msg])

        # Launch fleet
        fleet_id = launch_fleet(job_id, str(commands_file), gpu_enabled=gpu_enabled)

        if fleet_id:
            update_job(job_id, 
                      fleet_id=fleet_id,
                      log=jobs[job_id]['log'] + [f"Fleet {fleet_id} is processing documents..."])

    except Exception as e:
        error_msg = f"Fleet processing error: {str(e)}"
        logger.error(error_msg)
        update_job(job_id, status='failed', errors=[error_msg])


# ============================================================
# Routes
# ============================================================

@app.route('/')
def index():
    """Main page."""
    return render_template('index.html',
                          processing_mode=PROCESSING_MODE,
                          ce_region=CE_REGION)


@app.route('/upload', methods=['POST'])
def upload_files():
    """Handle file upload and start processing."""
    try:
        if 'files' not in request.files and 'folder_path' not in request.form:
            return jsonify({'error': 'No files or folder path provided'}), 400

        job_id = str(uuid.uuid4())
        input_files = []
        timestamp = get_timestamp()

        # Handle file uploads — accept all files, Docling detects format automatically
        if 'files' in request.files:
            files = request.files.getlist('files')
            upload_dir = Path(UPLOAD_FOLDER) / job_id
            upload_dir.mkdir(parents=True, exist_ok=True)

            for file in files:
                if file and file.filename:
                    filename = secure_filename(file.filename)
                    file_path = upload_dir / filename
                    file.save(str(file_path))
                    input_files.append(str(file_path))

        # Handle folder path — collect all files, Docling detects format automatically
        if 'folder_path' in request.form and request.form['folder_path']:
            folder_path = Path(request.form['folder_path'])
            if folder_path.exists() and folder_path.is_dir():
                for p in sorted(folder_path.rglob('*')):
                    if p.is_file():
                        input_files.append(str(p))
            else:
                return jsonify({'error': f'Folder not found: {folder_path}'}), 400

        if not input_files:
            return jsonify({'error': 'No valid files found to process'}), 400

        # Determine output destination
        output_dest = request.form.get('output_dest', OUTPUT_FOLDER)
        processing_mode = request.form.get('processing_mode', PROCESSING_MODE)
        gpu_enabled = request.form.get('gpu_enabled', 'false').lower() == 'true'

        # Create output directory
        if processing_mode == 'local':
            output_dir = Path(LOCAL_OUTPUT_FOLDER) / timestamp
        else:
            output_dir = Path(output_dest) if output_dest else Path(OUTPUT_FOLDER) / timestamp
        output_dir.mkdir(parents=True, exist_ok=True)

        # Create job
        job = create_job(job_id, input_files, str(output_dir), processing_mode, gpu_enabled)

        # Start processing in background thread
        if processing_mode == 'local':
            thread = threading.Thread(
                target=process_local,
                args=(job_id, input_files, str(output_dir))
            )
        elif processing_mode in ('fleet-cpu', 'fleet-gpu'):
            thread = threading.Thread(
                target=process_fleet,
                args=(job_id, input_files, str(output_dir), gpu_enabled or processing_mode == 'fleet-gpu')
            )
        else:
            return jsonify({'error': f'Unknown processing mode: {processing_mode}'}), 400

        thread.daemon = True
        thread.start()

        logger.info(f"Job {job_id} started: {len(input_files)} files, mode={processing_mode}, gpu={gpu_enabled}")

        return jsonify({
            'job_id': job_id,
            'status': 'pending',
            'total_files': len(input_files),
            'processing_mode': processing_mode,
            'gpu_enabled': gpu_enabled,
            'output_dir': str(output_dir)
        })

    except Exception as e:
        logger.error(f"Upload error: {str(e)}")
        return jsonify({'error': str(e)}), 500


@app.route('/job/<job_id>')
def get_job(job_id):
    """Get job status."""
    if job_id not in jobs:
        return jsonify({'error': 'Job not found'}), 404
    return jsonify(jobs[job_id])


@app.route('/jobs')
def list_jobs():
    """List all jobs."""
    return jsonify(list(jobs.values()))


@app.route('/job/<job_id>/download')
def download_results(job_id):
    """Download job results as a zip file."""
    import zipfile
    import io

    if job_id not in jobs:
        return jsonify({'error': 'Job not found'}), 404

    job = jobs[job_id]
    if job['status'] not in ('completed', 'completed_with_errors'):
        return jsonify({'error': 'Job not yet completed'}), 400

    # Create zip in memory
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
        for result in job['results']:
            if result.get('status') == 'success' and Path(result['output']).exists():
                zip_file.write(result['output'], Path(result['output']).name)

    zip_buffer.seek(0)
    timestamp = get_timestamp()

    return send_file(
        zip_buffer,
        mimetype='application/zip',
        as_attachment=True,
        download_name=f'docling_results_{timestamp}.zip'
    )


@app.route('/job/<job_id>/result/<filename>')
def download_single_result(job_id, filename):
    """Download a single result file."""
    if job_id not in jobs:
        return jsonify({'error': 'Job not found'}), 404

    job = jobs[job_id]
    for result in job['results']:
        if Path(result.get('output', '')).name == filename:
            if Path(result['output']).exists():
                return send_file(result['output'], as_attachment=True)

    return jsonify({'error': 'File not found'}), 404


@app.route('/local/process', methods=['POST'])
def process_local_folder():
    """Process documents from the local input folder."""
    try:
        input_dir = Path(LOCAL_INPUT_FOLDER)
        if not input_dir.exists():
            return jsonify({'error': f'Input folder not found: {LOCAL_INPUT_FOLDER}'}), 400

        # Collect all files — Docling auto-detects format, no extension filtering needed
        input_files = [str(p) for p in sorted(input_dir.rglob('*')) if p.is_file()]

        if not input_files:
            return jsonify({'error': f'No supported files found in {LOCAL_INPUT_FOLDER}'}), 400

        job_id = str(uuid.uuid4())
        timestamp = get_timestamp()
        output_dir = Path(LOCAL_OUTPUT_FOLDER) / timestamp
        output_dir.mkdir(parents=True, exist_ok=True)

        gpu_enabled = request.json.get('gpu_enabled', False) if request.is_json else False
        processing_mode = request.json.get('processing_mode', 'local') if request.is_json else 'local'

        job = create_job(job_id, input_files, str(output_dir), processing_mode, gpu_enabled)

        thread = threading.Thread(
            target=process_local,
            args=(job_id, input_files, str(output_dir))
        )
        thread.daemon = True
        thread.start()

        return jsonify({
            'job_id': job_id,
            'status': 'pending',
            'total_files': len(input_files),
            'input_dir': str(input_dir),
            'output_dir': str(output_dir)
        })

    except Exception as e:
        logger.error(f"Local process error: {str(e)}")
        return jsonify({'error': str(e)}), 500


@app.route('/fleet/status/<fleet_id>')
def fleet_status(fleet_id):
    """Get fleet status from IBM Cloud Code Engine."""
    try:
        result = subprocess.run(
            ['ibmcloud', 'ce', 'fleet', 'get', '--id', fleet_id, '--output', 'json'],
            capture_output=True, text=True, timeout=30
        )
        if result.returncode == 0:
            return jsonify(json.loads(result.stdout))
        else:
            return jsonify({'error': result.stderr}), 400
    except Exception as e:
        return jsonify({'error': str(e)}), 500


@app.route('/health')
def health():
    """Health check endpoint."""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.utcnow().isoformat(),
        'processing_mode': PROCESSING_MODE,
        'version': '1.0.0'
    })


@app.route('/config')
def get_config():
    """Get application configuration (non-sensitive)."""
    return jsonify({
        'processing_mode': PROCESSING_MODE,
        'ce_region': CE_REGION,
        'allowed_formats': ['pdf', 'docx', 'pptx', 'xlsx', 'html', 'md', 'asciidoc', 'csv', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'webp'],
        'max_file_size_mb': MAX_CONTENT_LENGTH // (1024 * 1024),
        'docling_gpu_image': DOCLING_GPU_IMAGE,
        'docling_cpu_image': DOCLING_CPU_IMAGE,
        'local_input_folder': LOCAL_INPUT_FOLDER,
        'local_output_folder': LOCAL_OUTPUT_FOLDER
    })


if __name__ == '__main__':
    port = int(os.getenv('PORT', 8080))
    debug = os.getenv('DEBUG', 'false').lower() == 'true'
    logger.info(f"Starting DoclingGPU app on port {port}, mode={PROCESSING_MODE}")
    app.run(host='0.0.0.0', port=port, debug=debug)

# Made with Bob

Enter fullscreen mode Exit fullscreen mode
# worker.py
"""
DoclingGPU Worker
Standalone document processing worker for IBM Code Engine Serverless Fleet.
Reads documents from /input, converts to Markdown, writes to /output.
Supports both CPU and GPU (CUDA) processing via Docling.
"""

import os
import sys
import json
import time
import logging
import argparse
from pathlib import Path
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

# Code Engine task metadata
TASK_INDEX = os.getenv('CE_TASK_INDEX', '0')
TASK_ID = os.getenv('CE_TASK_ID', 'local')

# Processing configuration
INPUT_DIR = os.getenv('INPUT_DIR', '/input')
OUTPUT_DIR = os.getenv('OUTPUT_DIR', '/output')
NUM_THREADS = int(os.getenv('DOCLING_NUM_THREADS', '4'))
DEVICE = os.getenv('DOCLING_DEVICE', 'auto')  # 'auto', 'cuda', 'cpu', 'mps'
ENABLE_OCR = os.getenv('DOCLING_ENABLE_OCR', 'true').lower() == 'true'
ENABLE_TABLE_STRUCTURE = os.getenv('DOCLING_ENABLE_TABLE_STRUCTURE', 'true').lower() == 'true'
ENABLE_PICTURE_CLASSIFICATION = os.getenv('DOCLING_ENABLE_PICTURE_CLASSIFICATION', 'false').lower() == 'true'


def detect_device():
    """Auto-detect best available device."""
    if DEVICE != 'auto':
        return DEVICE
    try:
        import torch
        if torch.cuda.is_available():
            gpu_name = torch.cuda.get_device_name(0)
            gpu_mem = torch.cuda.get_device_properties(0).total_memory / (1024**3)
            logger.info(f"GPU detected: {gpu_name} ({gpu_mem:.1f} GB VRAM)")
            return 'cuda'
        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
            logger.info("Apple MPS detected")
            return 'mps'
    except ImportError:
        pass
    logger.info("Using CPU for processing")
    return 'cpu'


def build_converter(device: str):
    """Build and configure the Docling DocumentConverter for all supported formats."""
    from docling.document_converter import DocumentConverter, PdfFormatOption
    from docling.datamodel.base_models import InputFormat
    from docling.datamodel.pipeline_options import (
        PdfPipelineOptions,
        EasyOcrOptions,
    )

    logger.info(f"Building Docling converter (device={device}, ocr={ENABLE_OCR}, tables={ENABLE_TABLE_STRUCTURE})")

    pipeline_options = PdfPipelineOptions()
    pipeline_options.do_ocr = ENABLE_OCR
    pipeline_options.do_table_structure = ENABLE_TABLE_STRUCTURE

    if ENABLE_OCR:
        # Use EasyOCR which supports GPU acceleration
        ocr_options = EasyOcrOptions(
            force_full_page_ocr=False,
            use_gpu=(device == 'cuda')
        )
        pipeline_options.ocr_options = ocr_options

    if ENABLE_PICTURE_CLASSIFICATION:
        pipeline_options.generate_picture_images = True
        pipeline_options.images_scale = 2.0

    # Set accelerator device
    try:
        from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
        device_map = {
            'cuda': AcceleratorDevice.CUDA,
            'mps': AcceleratorDevice.MPS,
            'cpu': AcceleratorDevice.CPU,
        }
        if device in device_map:
            pipeline_options.accelerator_options = AcceleratorOptions(
                num_threads=NUM_THREADS,
                device=device_map[device]
            )
    except ImportError:
        logger.warning("AcceleratorOptions not available in this Docling version")

    # Accept ALL Docling-supported formats — format is auto-detected per file
    converter = DocumentConverter(
        allowed_formats=[
            InputFormat.PDF,
            InputFormat.DOCX,
            InputFormat.PPTX,
            InputFormat.XLSX,
            InputFormat.HTML,
            InputFormat.MD,
            InputFormat.ASCIIDOC,
            InputFormat.CSV,
            InputFormat.IMAGE,
        ],
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    return converter


def process_file(converter, input_path: Path, output_dir: Path) -> dict:
    """Process a single document file."""
    start_time = time.perf_counter()
    output_file = output_dir / f"docling_{input_path.name}.md"

    try:
        logger.info(f"Processing: {input_path.name}")
        result = converter.convert(str(input_path))
        markdown_content = result.document.export_to_markdown()
        elapsed = time.perf_counter() - start_time

        # Write output with YAML frontmatter metadata
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write("---\n")
            f.write(f"source: {input_path.name}\n")
            f.write(f"source_path: {input_path}\n")
            f.write(f"processed_at: {datetime.utcnow().isoformat()}Z\n")
            f.write(f"processing_time_seconds: {elapsed:.3f}\n")
            f.write(f"device: {DEVICE}\n")
            f.write(f"task_index: {TASK_INDEX}\n")
            f.write(f"task_id: {TASK_ID}\n")
            f.write(f"pages: {len(result.document.pages) if hasattr(result.document, 'pages') else 'unknown'}\n")
            f.write("---\n\n")
            f.write(markdown_content)

        logger.info(f"{input_path.name}{output_file.name} ({elapsed:.2f}s)")

        return {
            'input': str(input_path),
            'output': str(output_file),
            'status': 'success',
            'processing_time': elapsed,
            'output_size_bytes': output_file.stat().st_size
        }

    except Exception as e:
        elapsed = time.perf_counter() - start_time
        logger.error(f"✗ Failed to process {input_path.name}: {e}")
        return {
            'input': str(input_path),
            'output': None,
            'status': 'failed',
            'error': str(e),
            'processing_time': elapsed
        }


def process_batch(input_files: list, output_dir: Path, device: str) -> dict:
    """Process a batch of documents."""
    total_start = time.perf_counter()
    results = []
    errors = []

    logger.info(f"=== DoclingGPU Worker ===")
    logger.info(f"Task Index: {TASK_INDEX}")
    logger.info(f"Device: {device}")
    logger.info(f"Files to process: {len(input_files)}")
    logger.info(f"Output directory: {output_dir}")
    logger.info("=" * 40)

    # Build converter once (expensive operation)
    converter = build_converter(device)
    logger.info("Converter initialized, starting batch processing...")

    for i, input_path in enumerate(input_files):
        logger.info(f"[{i+1}/{len(input_files)}] {input_path.name}")
        result = process_file(converter, input_path, output_dir)
        results.append(result)
        if result['status'] == 'failed':
            errors.append(result)

    total_elapsed = time.perf_counter() - total_start
    successful = len([r for r in results if r['status'] == 'success'])

    summary = {
        'task_index': TASK_INDEX,
        'task_id': TASK_ID,
        'device': device,
        'total_files': len(input_files),
        'successful': successful,
        'failed': len(errors),
        'total_processing_time': total_elapsed,
        'avg_time_per_file': total_elapsed / len(input_files) if input_files else 0,
        'results': results
    }

    logger.info("=" * 40)
    logger.info(f"Batch complete: {successful}/{len(input_files)} files processed in {total_elapsed:.2f}s")
    if errors:
        logger.warning(f"Errors: {len(errors)} files failed")

    return summary


def find_input_files(input_dir: Path) -> list:
    """Find all files in the input directory. Docling auto-detects supported formats."""
    # Collect every file recursively — Docling will skip unsupported formats gracefully
    return sorted([p for p in input_dir.rglob('*') if p.is_file()])


def main():
    parser = argparse.ArgumentParser(
        description='DoclingGPU Worker - Process documents with Docling on IBM Code Engine'
    )
    parser.add_argument(
        '--input', '-i',
        default=INPUT_DIR,
        help=f'Input directory or file (default: {INPUT_DIR})'
    )
    parser.add_argument(
        '--output', '-o',
        default=OUTPUT_DIR,
        help=f'Output directory (default: {OUTPUT_DIR})'
    )
    parser.add_argument(
        '--file-list', '-f',
        help='Path to a text file containing list of files to process (one per line)'
    )
    parser.add_argument(
        '--device',
        default=DEVICE,
        choices=['auto', 'cuda', 'cpu', 'mps'],
        help='Processing device (default: auto)'
    )
    parser.add_argument(
        '--threads',
        type=int,
        default=NUM_THREADS,
        help=f'Number of threads (default: {NUM_THREADS})'
    )
    parser.add_argument(
        '--no-ocr',
        action='store_true',
        help='Disable OCR processing'
    )
    parser.add_argument(
        '--no-tables',
        action='store_true',
        help='Disable table structure detection'
    )
    parser.add_argument(
        '--summary-file',
        help='Write processing summary JSON to this file'
    )

    args = parser.parse_args()

    # Override globals from args
    global DEVICE, NUM_THREADS, ENABLE_OCR, ENABLE_TABLE_STRUCTURE
    DEVICE = args.device
    NUM_THREADS = args.threads
    if args.no_ocr:
        ENABLE_OCR = False
    if args.no_tables:
        ENABLE_TABLE_STRUCTURE = False

    # Detect device
    device = detect_device()

    # Determine input files
    input_files = []

    if args.file_list:
        # Read file list from text file
        file_list_path = Path(args.file_list)
        if not file_list_path.exists():
            logger.error(f"File list not found: {args.file_list}")
            sys.exit(1)
        with open(file_list_path) as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith('#'):
                    p = Path(line)
                    if p.exists():
                        input_files.append(p)
                    else:
                        logger.warning(f"File not found: {line}")
    else:
        input_path = Path(args.input)
        if input_path.is_file():
            input_files = [input_path]
        elif input_path.is_dir():
            input_files = find_input_files(input_path)
        else:
            logger.error(f"Input not found: {args.input}")
            sys.exit(1)

    if not input_files:
        logger.warning("No input files found. Exiting.")
        sys.exit(0)

    # Ensure output directory exists
    output_dir = Path(args.output)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Process batch
    summary = process_batch(input_files, output_dir, device)

    # Write summary
    if args.summary_file:
        summary_path = Path(args.summary_file)
        with open(summary_path, 'w') as f:
            json.dump(summary, f, indent=2)
        logger.info(f"Summary written to {summary_path}")

    # Also write summary to output dir
    summary_output = output_dir / f"processing_summary_task{TASK_INDEX}.json"
    with open(summary_output, 'w') as f:
        json.dump(summary, f, indent=2)

    # Exit with error code if any failures
    if summary['failed'] > 0:
        logger.warning(f"Completed with {summary['failed']} failures")
        sys.exit(1)

    logger.info("All files processed successfully")
    sys.exit(0)


if __name__ == '__main__':
    main()

# Made with Bob

Enter fullscreen mode Exit fullscreen mode

Conclusion: Mastering the Future of Scalable Document AI

By combining the document-parsing intelligence of Docling with the massive parallel power of IBM Code Engine Serverless Fleets, Bob has created a blueprint for enterprise-grade document processing that is as efficient as it is powerful. From the intuitive Web UI and robust REST API **that manage the control plane, to the ephemeral GPU-accelerated workers that handle the computational heavy lifting, this architecture proves that high-volume data conversion no longer requires permanent, costly infrastructure. Whether deployed locally for rapid prototyping or orchestrated via Terraform for global cloud scale, the DoclingGPU application exemplifies the best of modern serverless design: it is cost-effective, scaling to zero when idle; it is flexible, supporting both **CPU and NVIDIA L40s/H100 GPU workloads; and most importantly, it is accessible. For organizations looking to transform thousands of complex documents into LLM-ready Markdown in minutes rather than days, Bob’s implementation provides the ultimate roadmap for scaling document intelligence without limits.

>>> Thanks for reading <<<

Links

Top comments (0)