Introduction
At pdfparse.net, we run a document parsing system that turns PDFs into structured, queryable data at scale. Our stack is Cloudflare-native, which means global distribution, predictable costs, and no servers to babysit.
This article focuses on invoice parsing because it combines structured fields, messy layouts, and long OCR runtimes. We will build a pipeline that accepts large batches, survives failures, and keeps users informed about progress end to end.
Architecture Overview
At a high level, the pipeline looks like this:
HTTP API → Queue → Workflow → Mistral Batch OCR → Database
- API accepts document IDs and creates a job record.
- Queues smooth bursts and split work into predictable batches.
- Workflows orchestrate long-running OCR with retries and durable state.
- Mistral performs OCR in batch mode.
- Database stores job status and extracted invoice data.
We keep the pieces small and loosely coupled so each one can be retried safely without restarting the entire job.
Project Setup
Prerequisites
- Cloudflare account with Workers paid plan (for Workflows)
- Mistral AI API key (sign up)
- Node.js 18+ and pnpm
Dependencies
We keep the dependency list focused:
{
"dependencies": {
"@mistralai/mistralai": "^1.0.0",
"drizzle-orm": "^0.44.0",
"effect": "^3.19.0",
"zod": "^3.23.0"
},
"devDependencies": {
"wrangler": "^3.90.0",
"@cloudflare/workers-types": "^4.0.0"
}
}
Project Structure
document-parser/
├── packages/
│ ├── db/ # Shared database schema + migrations
│ ├── queue-manager/ # HTTP + queue consumer
│ └── workflows/ # Durable workflow logic
Database Setup (D1 + Drizzle)
We store jobs, batch processes, and extracted invoice fields in D1.
import { sqliteTable, text, integer, numeric } from "drizzle-orm/sqlite-core";
export const jobs = sqliteTable("jobs", {
id: integer("id").primaryKey({ autoIncrement: true }),
jobStatus: text("job_status").default("queued").notNull(),
createdAt: integer("created_at", { mode: "timestamp" })
.$defaultFn(() => new Date()),
});
export const batch_processes = sqliteTable("batch_processes", {
id: integer("id").primaryKey({ autoIncrement: true }),
batchId: text("batch_id").notNull().unique(),
workflowInstanceId: text("workflow_instance_id").notNull().unique(),
job_id: integer("job_id").references(() => jobs.id, { onDelete: "cascade" }),
status: text("status").default("init"),
});
export const invoices = sqliteTable("invoices", {
invoiceId: text("invoice_id").primaryKey(),
vendorName: text("vendor_name").notNull(),
invoiceDate: text("invoice_date").notNull(),
dueDate: text("due_date").notNull(),
totalAmountUsd: numeric("total_amount_usd").notNull()
});
Create the database and run migrations:
wrangler d1 create document-parser-db
pnpm drizzle-kit generate
wrangler d1 migrations apply document-parser-db --remote
Worker Bindings
Add bindings to both the queue worker and workflow:
{
"r2_buckets": [
{ "binding": "DOCUMENT_BUCKET", "bucket_name": "<YOUR_BUCKET_NAME>" }
],
"d1_databases": [
{
"binding": "DB",
"database_name": "<YOUR_DB_NAME>",
"database_id": "<YOUR_DB_ID>"
}
],
"queues": {
"producers": [{ "binding": "JOB_QUEUE", "queue": "<YOUR_QUEUE_NAME>" }],
"consumers": [{ "queue": "<YOUR_QUEUE_NAME>" }]
},
"workflows": [
{ "binding": "WORKFLOW", "name": "<YOUR_WORKFLOW_NAME>" }
]
}
Also add a MISTRAL_API_KEY secret and an R2 access URL for presigned links.
Queue Manager
The queue manager handles two things: accepting document IDs via HTTP and fanning them out into batches.
import { createDB, jobs } from "@repo/db";
type Env = {
DB: D1Database;
JOB_QUEUE: Queue;
WORKFLOW: Workflow;
};
export default {
async fetch(request: Request, env: Env): Promise<Response> {
if (request.method !== "POST") {
return new Response("Not Found", { status: 404 });
}
const { documentIds } = await request.json();
const db = await createDB(env.DB);
const [job] = await db
.insert(jobs)
.values({ jobStatus: "queued" })
.returning();
const batches = chunk(documentIds, 50);
await env.JOB_QUEUE.sendBatch(
batches.map((batch, index) => ({
body: {
jobId: job.id,
documentKeys: batch,
batchIndex: index,
totalBatches: batches.length,
},
}))
);
return Response.json({ jobId: job.id });
},
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
for (const message of batch.messages) {
await env.WORKFLOW.create({ params: message.body });
message.ack();
}
},
};
Why 50 documents per batch? It is a good balance between throughput and retry cost. If files are large or highly variable, use smaller batches to keep failure recovery cheap.
Workflows: Durable, Retryable OCR
Workflows are a great fit for OCR: they can pause, retry, and keep state across minutes or hours without manual orchestration.
Step 1: Submit a Batch (with Effect.ts)
We start by assembling a batch payload and handing it off to Mistral.
We use Effect.ts to make failure handling explicit and consistent. It keeps errors typed and composable, which matters when you are orchestrating network calls, file uploads, and database updates.
const batchJob = await step.do("submit-batch", async () => {
const program = pipe(
Effect.gen(function* (_) {
const urls = yield* _(generatePresignedUrls(documentKeys, env.R2_URL));
const requests = urls.map((url, i) => ({
custom_id: documentKeys[i],
body: {
model: "mistral-ocr-latest",
document: { type: "document_url", document_url: url },
},
}));
const jsonl = requests.map((r) => JSON.stringify(r)).join("\n");
const client = new Mistral({ apiKey: env.MISTRAL_API_KEY });
const file = new File([jsonl], `batch_${jobId}.jsonl`, {
type: "application/x-ndjson",
});
const uploadedFile = yield* _(
Effect.tryPromise({
try: () => client.files.upload({ file, purpose: "batch" }),
catch: (error) => new BatchUploadError({ cause: error }),
})
);
return yield* _(
Effect.tryPromise({
try: () =>
client.batch.jobs.create({
inputFiles: [uploadedFile.id],
model: "mistral-ocr-latest",
endpoint: "/v1/ocr",
metadata: { jobId: String(jobId) },
}),
catch: (error) => new BatchCreationError({ cause: error }),
})
);
})
);
return Effect.runPromise(program);
});
Step 2: Poll for Completion
We wait for Mistral to finish by polling inside the workflow. Sleeping steps are not billed, so long polling stays cost-effective. You can configure retries via the retries: {} option on step.do().
const results = await step.do("poll-status", {
retries: { limit: 60, delay: "30 seconds" },
async () => {
const program = pipe(
Effect.gen(function* (_) {
const status = yield* _(
Effect.tryPromise({
try: () => client.batch.jobs.get({ jobId: batchJob.id }),
catch: (error) => new NetworkError({ cause: error }),
})
);
if (status.status !== "SUCCESS") {
throw new Error("Still processing");
}
return parseJsonlResults(await downloadFile(status.outputFile));
})
);
return Effect.runPromise(program);
},
});
Step 3: Save Results
Once results are ready, we persist the extracted fields and keep writes simple.
await step.do("save-results", async () => {
for (const result of results) {
await db.insert(invoices).values({ /* ... */ });
}
});
Step 4: Update Batch Status
After saving results, we update the batch record and mark the parent job complete once all batches finish.
await step.do("update-status", async () => {
await db
.update(batch_processes)
.set({
status: "completed",
completedRequests: results.length,
})
.where(eq(batch_processes.batchId, batchJob.id));
const allBatches = await db
.select()
.from(batch_processes)
.where(eq(batch_processes.job_id, jobId));
const allComplete = allBatches.every(
(b) => b.status === "completed" || b.status === "failed"
);
if (allComplete) {
await db
.update(jobs)
.set({ jobStatus: "completed" })
.where(eq(jobs.id, jobId));
}
});
Failure Modes We Plan For
- OCR provider outages: workflow retries with backoff, status remains "processing."
- Bad documents: mark failures per document, not per job.
- Duplicate delivery: use idempotent keys (batch ID + document ID).
- Database contention: chunk writes and keep transactions short.
This keeps the pipeline stable without manual cleanup.
Observability and Status
We expose job status directly from D1. Each job has a jobStatus and per-batch metadata, which lets us show progress like "12/24 batches complete." Internally, we log workflow step timings to identify slow OCR runs and throttling.
If you need an external-facing dashboard, this structure makes it trivial to compute percent complete without storing extra state.
Top comments (0)