I got paged at 6am twice in three months because of the same class of problem.
Upstream API changed a field type. My pipeline ingested the bad data without complaint. Six hours later, a dashboard was showing NaN totals and an ML model was producing garbage predictions. The root cause was at ingestion — but the symptom showed up at reporting.
The fix was always fast. The detection was always late.
I went looking for something that would sit between my data sources and my database, reject bad payloads at the gate, and tell me exactly what was wrong — in real time, before anything touched storage. I couldn't find something that was simple enough to drop into an existing pipeline with a single function call.
So I built it. This is what I learned.
The architecture problem with batch validation
The standard data quality approach looks like this:
Extract → Load → Transform → Test (Great Expectations / dbt) → Alert
The test runs after the INSERT. By the time it fires, bad data has been sitting in your warehouse for however long your pipeline takes to run. Your recovery options are: delete and re-ingest (if the source data hasn't changed), patch bad rows manually, mark a time range as unreliable, or silently move on. None of these are good.
What I wanted was this:
Source → [Quality Gate] → PASS → Database
→ WARN → Quarantine
→ BLOCK → Dead-letter queue
The check before the INSERT, not after. Every batch screened. Every problem caught with enough information to act on immediately.
Why Cloudflare Workers
The constraints were:
- Sub-10ms — can't add meaningful latency to a pipeline
- Stateless — no infrastructure to manage
- In-memory only — can't write raw payload data anywhere
- Global — needs to run close to the source
Cloudflare Workers fit all four. The critical privacy property is architectural, not policy: Workers have no filesystem access. There is no fs.writeFile. Raw payload values physically cannot be persisted at the edge layer — the runtime makes it impossible. What I write to persistent storage (KV, D1, Durable Objects) is exclusively post-aggregation: schema hashes, null rates, type distributions, quality scores.
The full Cloudflare stack I ended up using:
# wrangler.toml
[[kv_namespaces]]
binding = "KV" # schema hash cache + API keys
[[d1_databases]]
binding = "DB" # job history + QA summaries (control plane only)
[durable_objects]
bindings = [{ name = "TENANT_DO", class_name = "TenantDurableObject" }]
# authoritative schema state + billing counters, one DO per tenant
[[queues.producers]]
queue = "datascreeniq-overflow" # large payload overflow
The hot path: what happens in under 10ms
Every request to POST /v1/screen runs this sequence:
1. Parse payload (JSON or CSV)
2. Deterministic hash-based sampling
3. Single-pass column analysis
4. Build schema fingerprint (SHA-256)
5. KV cache lookup for previous schema hash
6. Drift detection vs Durable Object baseline
7. Health score calculation
8. Return PASS / WARN / BLOCK + full report
9. (fire-and-forget) Update DO state, save to D1, fire webhooks
Steps 1–8 are synchronous and block the response. Step 9 runs in waitUntil() after the response is already sent. That's how you hit sub-10ms while still persisting state and delivering webhook alerts.
Deterministic sampling — the part that matters most
If you're running statistical checks at the edge, you can't scan every row on large batches. You need to sample. But naive random sampling has a problem: it's not reproducible. If a customer asks "why didn't you catch that null explosion last Tuesday?", you can't answer.
The sampling is deterministic: seeded from a hash of the source name + field names + row count.
export async function payloadHash(
source: string,
rows: Record<string, unknown>[]
): Promise<string> {
const firstRowKeys = rows.length > 0
? Object.keys(rows[0]).sort().join(",")
: "";
const input = `${source}:${firstRowKeys}:${rows.length}`;
return sha256(input);
}
export function deterministicSample<T>(
rows: T[],
sampleSize: number,
datasetHash: string
): { sampled: T[]; ratio: number } {
const seed = parseInt(datasetHash.substring(0, 8), 16);
const ratio = sampleSize / rows.length;
const sampled: T[] = [];
for (let i = 0; i < rows.length; i++) {
const h = mulberry32(seed ^ (i * 2654435761));
if (h < ratio) {
sampled.push(rows[i]);
if (sampled.length >= sampleSize) break;
}
}
return { sampled, ratio: sampled.length / rows.length };
}
The sampling version ("v2") and ratio are returned in every API response. Any quality decision is permanently auditable — you can always show exactly what was checked and under which strategy.
Single-pass column analysis
The analyzer runs one loop over all sampled rows and builds per-column statistics simultaneously. No second passes, no intermediate data structures beyond what's needed per column.
For each column it computes:
- Null rate and empty string rate (separate — they're different problems)
- Dominant type + type mismatch rate (what fraction of values aren't the expected type)
- Min/max (lexicographic for strings, numeric for numbers)
- p25/p50/p75/p95 via sort + linear interpolation (numeric only)
-
Outlier count via IQR method (values outside
[Q1 - 1.5×IQR, Q3 + 1.5×IQR]) - Approximate distinct count via HyperLogLog
- Duplicate rate derived from distinct count vs total
- Enum tracking for low-cardinality string fields (≤20 unique values)
- Timestamp detection — ISO 8601 strings auto-detected, recency checked
The HyperLogLog implementation uses p=14 (16,384 registers), giving ~0.8% standard error on distinct count estimates. It runs MurmurHash3 internally — no external dependency, pure TypeScript.
const P = 14;
const M = 1 << P; // 16,384 registers
const ALPHA = 0.7213 / (1 + 1.079 / M);
// add() hashes the value, uses top P bits as register index,
// stores the position of the leftmost 1-bit in the remaining bits.
// estimate() uses harmonic mean of register values with bias correction.
Drift detection: 8 signal types
After the column analysis, the current batch is compared against the baseline stored in the tenant's Durable Object. The DO stores: schema fingerprints per source, null rate baselines, known enum value sets, and a rolling 20-batch row count history.
The drift signals, in order of severity:
| Signal | Trigger | Severity |
|---|---|---|
type_changed |
Field type changed from baseline | BLOCK |
row_count_anomaly |
Batch size >10× from average | BLOCK |
null_spike |
Null rate increased >20% from baseline | WARN or BLOCK |
timestamp_stale |
Most recent timestamp >72h old | BLOCK |
field_added |
New field not in previous schema | WARN |
field_removed |
Known field missing | WARN |
empty_string_spike |
Empty string rate >30% | WARN |
new_enum_value |
New value in tracked enum field | WARN |
timestamp_stale |
Most recent timestamp >24h old | WARN |
Type changes are always BLOCK because they silently corrupt downstream aggregations. A field that was a number and is now a string will cause every SUM and AVG query on it to fail or return wrong results — often without an error, just a wrong answer.
The two-layer state architecture
The tenant state has two components with different consistency guarantees:
KV (eventually consistent, ~1ms reads): Schema hash cache. Used for the fast path — if the hash matches the cached value, we skip the DO round-trip. TTL of 24 hours.
Durable Objects (strongly consistent, ~5ms): Source of truth for schema fingerprints, null rate baselines, billing counters, enum sets, row count history. One DO per tenant, keyed as do:tenant:{id}. This was a deliberate design choice after reading the Cloudflare reviewer feedback that using KV as source-of-truth for schema enforcement would be wrong — propagation delays of seconds to minutes mean you could miss drift events during the window.
// Fast path: KV hash match → skip DO
const cachedHash = await env.KV.get(`schema_hash:${tenantId}:${source}`);
if (cachedHash && cachedHash === fingerprint.hash) {
// Schema stable — still fetch DO for null baselines, but common path is fast
tenantState = await fetchTenantState(env, tenantId);
} else {
// Schema changed or cache miss — full DO state comparison
tenantState = await fetchTenantState(env, tenantId);
}
Health score and verdict
The health score is a penalty-based system starting at 1.0:
// Column-level penalties
if (col.nullRate > thresholds.null_rate_warn) score *= (1 - col.nullRate * 0.3);
if (col.typeMismatchRate > thresholds.type_mismatch_warn) score *= (1 - col.typeMismatchRate * 0.5);
if (col.emptyStringRate > 0.2) score *= (1 - col.emptyStringRate * 0.15);
if (col.duplicateRate > 0.1) score *= (1 - col.duplicateRate * 0.1);
// Drift event penalties
// BLOCK severity: −20% | WARN: −8% | INFO: −2%
Penalties are multiplicative (diminishing returns) so a single catastrophic column can't push the score below 0. The verdict logic:
Any BLOCK drift event → BLOCK
Health < 0.5 → BLOCK
Health < 0.8 OR any WARN drift → WARN
Otherwise → PASS
Default thresholds are tuned for general use but overridable per-request via options.thresholds.
The Python SDK
The backend is TypeScript on Cloudflare Workers. The client-facing SDK is Python, published to PyPI.
pip install datascreeniq
import datascreeniq as dsiq
client = dsiq.Client("dsiq_live_...")
rows = [
{"order_id": "ORD-001", "amount": 99.50, "email": "alice@corp.com"},
{"order_id": "ORD-002", "amount": "broken", "email": None},
{"order_id": "ORD-003", "amount": 75.00, "email": None},
]
report = client.screen(rows, source="orders")
print(report.status) # BLOCK
print(report.summary())
# 🚨 BLOCK | Health: 34.0% | Type mismatches: amount | Null rate: email=67% | (7ms)
The raise_on_block() pattern integrates cleanly into any orchestrator:
# Airflow
@task
def quality_gate(rows, source):
report = dsiq.Client().screen(rows, source=source)
if report.is_blocked:
raise ValueError(report.summary())
return report.to_dict()
# Prefect
@task
def screen_data(rows, source):
dsiq.Client().screen(rows, source=source).raise_on_block()
What I got wrong (and had to fix)
Cold start problem. The first batch from a new source has no baseline to compare against. The first call always returns PASS regardless of content — you need at least 2–3 batches before drift detection has meaning. I underestimated how often this confuses new users who test with a single call and assume PASS means "healthy data."
Sampling under-coverage edge cases. The hash-based sampler can undersample in rare cases when the hash distribution is unlucky for a specific seed + row count combination. I added a fallback: if the sampler returns fewer than sampleSize rows, it fills up using a fixed stride through the remaining rows. The fallback kicks in rarely but the determinism guarantee holds.
KV as source-of-truth was wrong. My original design used KV for schema enforcement. I got feedback that KV's eventual consistency model (propagation delays up to 60 seconds at the edge) meant you could miss drift events during the propagation window. Moved authoritative schema state to Durable Objects — KV is now read cache only.
Null vs empty string. These are different problems. A null means the value was absent. An empty string means someone sent "" instead of null — often a serialisation bug, sometimes intentional. Tracking them separately turned out to be important for downstream consumers who handle them differently.
Where it is now
Live at datascreeniq.com. Python SDK on PyPI (pip install datascreeniq). Free tier is 500K rows/month, no credit card.
GitHub: github.com/AppDevIQ/datascreeniq-python
If you're running pipelines that ingest from upstream APIs and have been burned by schema drift, I'd genuinely appreciate feedback — especially on the cold start handling, and on what thresholds make sense for different data domains (financial data vs user events vs IoT streams all have very different null rate norms).

Top comments (0)