apache hudi sells itself as one table format, but in practice it is two table formats glued onto a shared transaction log. Every senior data engineer who has been paged at 03:00 by a lakehouse incident knows the truth: the table type you pick on day one decides whether your platform amortises write cost (Copy-on-Write, CoW) or amortises read cost (Merge-on-Read, MoR) for the next three years. Swapping it later is a migration project, not a config toggle.
This guide is the senior-level playbook for the hudi copy on write vs hudi merge on read decision. It walks through the Hudi timeline (the file-group anatomy with base parquet + delta log avro, the instants and their REQUESTED/INFLIGHT/COMPLETED states, the commit / deltacommit / compaction / clean / rollback / savepoint vocabulary), the CoW rewrite path that keeps reads fast, the MoR base-plus-logs path that keeps writes fast, the compaction and cleaner contracts that bound storage cost, and the multi-engine read matrix across Spark, Flink, Presto, and Trino. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.
When you want hands-on reps immediately after reading, drill the aggregation practice library →, rehearse on streaming problems →, and stack the lakehouse muscles with ETL drills →.
On this page
- Hudi mental model — table types, timeline, file groups
- Copy-on-Write — full rewrites, read-heavy workloads
- Merge-on-Read — base + delta logs, write-heavy workloads
- Compaction and cleaning — inline vs async scheduler
- Picking the type — read latency vs write rate, multi-engine read
- Cheat sheet — MoR vs CoW recipes
- Frequently asked questions
- Practice on PipeCode
1. Hudi mental model — table types, timeline, file groups
Apache Hudi is a transactional lakehouse format built on three primitives — the timeline, the file group, and the table type
The one-sentence invariant: a Hudi table is a directory of partitioned file groups governed by an append-only timeline of instants, and the table type (COPY_ON_WRITE or MERGE_ON_READ) decides whether every write rewrites a file group's base parquet or appends a row-based delta log next to it. Once you internalise those three primitives, every Hudi interview question — concurrency, compaction sizing, query latency, schema evolution — becomes a property of one of them.
The three primitives in one paragraph.
-
The timeline. Every Hudi table has a
.hoodie/metadata directory that stores an ordered sequence of instants. An instant is a monotonically-timestamped action —commit,deltacommit,compaction,clean,rollback,savepoint,replacecommit,restore— captured as three files (<ts>.<action>.requested,<ts>.<action>.inflight,<ts>.<action>). Together they form the table's truth log. -
The file group. Each partition is split into one or more file groups, identified by a UUID-style
fileId. A file group is the durability unit: every upsert against a record key lands in exactly one file group inside a partition. The mapping is stable for the life of the file group, which is what gives Hudi its primary-key semantics on top of immutable object storage. -
The table type.
COPY_ON_WRITEstores each file group as a single base parquet that is rewritten on every commit.MERGE_ON_READstores each file group as a base parquet plus a stack of row-based log files (Avro), and only periodically compacts them. The type is set at table creation and is not casually changeable.
The instant state machine.
- REQUESTED. Driver has decided to attempt the action but no work has started. Cheap to roll back.
-
INFLIGHT. Executors are writing data files. A crash here leaves orphan files that the next
rollbackinstant must clean up. - COMPLETED. The action's commit metadata (which file groups changed, which rows, schema, stats) is durably written. Readers can now observe the action's effect.
The action vocabulary every senior DE must know cold.
-
commit— written by CoW tables on every upsert. Each commit produces one new base file version per touched file group. -
deltacommit— written by MoR tables on every upsert. Each deltacommit appends a new log file slice to each touched file group. -
compaction— MoR-only. Merges accumulated log files into a fresh base parquet. Has its own REQUESTED → INFLIGHT → COMPLETED lifecycle. -
clean— garbage-collects old file versions that no live reader needs, per the cleaner policy (KEEP_LATEST_COMMITSorKEEP_LATEST_FILE_VERSIONS). -
rollback— undoes a failed instant by deleting orphan files. Triggered automatically by the next writer. -
savepoint/restore— manual checkpoints.savepointpins a particular instant from being cleaned;restorerolls the table back to a savepoint. -
replacecommit— used by clustering andINSERT_OVERWRITE. Replaces a whole set of file groups with new ones.
The 2026 reality.
- CoW is the default for batch analytics tables that are loaded daily or hourly and read heavily by dashboards.
- MoR is the default for streaming sinks (Kafka → Hudi, CDC tables, IoT ingestion) where write rate is high and a few minutes of read lag is acceptable.
- Both types coexist in the same data lake — the choice is per table, never per platform.
Worked example — read the timeline
Detailed explanation.
-
Goal. Reconstruct what happened to a Hudi table by listing the
.hoodie/directory and reading the instants in order. - Why this matters. When a downstream report drifts from expected numbers, the first diagnostic step is to read the timeline. The instant log shows you every commit, the files it touched, and whether any rollbacks intervened.
-
Where it lives. Every Hudi table root has a hidden
.hoodie/directory; the action files are named<timestamp>.<action>[.<state>]where<state>isrequested,inflight, or absent (completed).
Question. A teammate hands you a Hudi table path and asks "what's the latest committed state?" Walk through the directory listing below and identify the latest committed instant, any inflight work, and any rollbacks.
Input.
| filename | meaning |
|---|---|
20260613090000.commit |
commit, completed |
20260613100000.commit.requested |
commit, requested |
20260613100000.commit.inflight |
commit, inflight |
20260613100000.commit |
commit, completed |
20260613110000.commit.requested |
commit, requested |
20260613110000.rollback |
rollback, completed |
20260613120000.commit |
commit, completed |
Code.
# List instants from .hoodie/, parse state, find the latest completed
import os, re
def list_instants(hoodie_dir: str) -> list[tuple[str, str, str]]:
"""Returns (timestamp, action, state) for each file."""
rows = []
for fn in os.listdir(hoodie_dir):
m = re.match(r"(\d+)\.(\w+)(?:\.(\w+))?$", fn)
if not m:
continue
ts, action, state = m.group(1), m.group(2), m.group(3) or "completed"
rows.append((ts, action, state))
return sorted(rows)
def latest_completed_commit(rows):
return max(
(r for r in rows if r[1] in ("commit", "deltacommit", "replacecommit")
and r[2] == "completed"),
key=lambda r: r[0],
default=None,
)
Step-by-step explanation.
- The first instant
20260613090000.commitis completed — the table's earliest visible state. - The
100000instant has all three files (requested, inflight, completed). The completed file means it landed cleanly. - The
110000instant has only requested — no inflight, no completed. It was abandoned mid-flight. The next writer noticed and emitted20260613110000.rollbackto clean up. - The
120000instant is the latest committed action. Readers should snapshot the table at this instant. - The latest committed commit is
20260613120000. The rollback in between is metadata-only — it does not produce a new readable version, only undoes the failed110000.
Output.
| Property | Value |
|---|---|
| latest completed commit | 20260613120000 |
| failed / rolled-back instants | 20260613110000 |
| inflight work | (none) |
| commits since 0900 | 3 (0900, 1000, 1200) |
Rule of thumb. Read the timeline before you read the data. The instant log answers "what happened?" cheaper than running a row count.
Worked example — file group anatomy
Detailed explanation.
- Goal. Visualise what a single file group looks like on disk for each table type.
- Why this matters. Most Hudi performance bugs are file-group sizing bugs (too few rows per file group, too many log slices before compaction). Knowing the anatomy makes the bug obvious.
-
The layout. Inside
partition=<value>/directories, the file group is identified by a UUID; each base parquet and each log slice carries the fileId in its name plus a commit timestamp.
Question. Given the directory listing below, identify which files belong to the same file group, distinguish base files from log files, and recognise whether the table is CoW or MoR.
Input.
| filename | path |
|---|---|
f7a2-0_0-100-200_20260613090000.parquet |
partition=us/ |
f7a2-0_0-110-210_20260613100000.parquet |
partition=us/ |
.f7a2-0_20260613100000.log.1_0-120-220 |
partition=us/ |
.f7a2-0_20260613100000.log.2_0-130-230 |
partition=us/ |
9b1c-0_0-101-201_20260613090000.parquet |
partition=us/ |
Code.
# Group filenames by fileId, classify each as base or log slice
import re
def parse_hudi_file(fn: str) -> dict:
if fn.startswith("."):
# log slice: .<fileId>_<baseCommit>.log.<version>_<writeToken>
m = re.match(r"\.([a-f0-9-]+)_(\d+)\.log\.(\d+)", fn)
return {"kind": "log", "fileId": m.group(1),
"baseCommit": m.group(2), "version": int(m.group(3))}
# base file: <fileId>_<writeToken>_<commit>.parquet
m = re.match(r"([a-f0-9-]+)_[\w-]+_(\d+)\.parquet$", fn)
return {"kind": "base", "fileId": m.group(1), "commit": m.group(2)}
Step-by-step explanation.
- The first two parquet files share the fileId
f7a2. They are two versions of the same file group's base file (commit 0900 then commit 1000). CoW would emit one new base per commit on this file group. - The two
.logfiles share the same fileIdf7a2and havebaseCommit=1000in their names. They are delta log slices written after the 1000 commit landed. Their presence means this is a MoR table. - The
9b1cparquet is a separate file group in the same partition. Each partition can host many file groups; record keys are routed to one and only one fileId. - The leading dot on log files (
.f7a2...) is the Hudi convention to mark them as auxiliary so that non-Hudi-aware listers skip them.
Output.
| fileId | base files | log slices | inferred table type |
|---|---|---|---|
| f7a2 | v1 @ 0900, v2 @ 1000 | 2 slices over base 1000 | MoR (because log slices exist) |
| 9b1c | v1 @ 0900 | none | indeterminate (could be CoW or quiet MoR) |
Rule of thumb. The presence of .log files in a partition is the on-disk signature of a MoR table. The absence of log files does not prove CoW — it could be a MoR table that has just been compacted.
Worked example — instants and the REQUESTED → INFLIGHT → COMPLETED states
Detailed explanation.
- Goal. Trace a single commit's lifecycle through the three states.
- Why this matters. Most "the writer died and now nothing works" incidents are an inflight instant blocking the next attempt. Knowing the state machine tells you what to delete (or what to leave alone).
-
The lifecycle. Every action — including
compactionandclean— passes through REQUESTED, then INFLIGHT, then COMPLETED. A crash between any two states leaves an artefact for the next writer to inspect.
Question. A writer crashed mid-deltacommit. Which files exist in .hoodie/, what should the next writer do, and what should the next reader do?
Input.
| filename in .hoodie/ | meaning |
|---|---|
20260613100000.deltacommit.requested |
requested file |
20260613100000.deltacommit.inflight |
inflight file |
(no completed file yet) |
the commit never finished |
Code.
# Pseudo-code from the next writer's startup probe
def reconcile(timeline):
for instant in timeline.pending_instants():
if instant.state == "INFLIGHT":
# Writer should attempt rollback
timeline.schedule_rollback(instant)
elif instant.state == "REQUESTED" and instant.is_orphan():
timeline.schedule_rollback(instant)
Step-by-step explanation.
- The next writer opens the table and lists
.hoodie/. It sees an inflight deltacommit at timestamp 1000 with no completed file. - Hudi's invariant: any inflight instant whose writer is no longer alive must be rolled back before a new instant can commit. The writer schedules a
rollbackinstant. - The rollback action deletes any orphan log files written under the failed deltacommit (using the commit metadata embedded in their names).
- The next reader, querying the table during this window, sees only completed instants. The failed inflight is invisible to readers — they never observe a partial state.
Output.
| Actor | Sees | Action |
|---|---|---|
| Next writer | inflight 1000 | emit rollback |
| Next reader | latest completed commit (≤ 0900) | read as if 1000 never existed |
| Operator | orphan log files | wait for rollback or run hudi-cli rollback |
Rule of thumb. Never manually delete files in .hoodie/ or in partition directories — Hudi's rollback can do it safely. Hand-deletion will desync the timeline from the data and is the canonical way to corrupt a lakehouse.
Interview question on the Hudi mental model
A senior interviewer often opens with: "Explain the difference between a commit and a deltacommit in Hudi, and walk me through what a single upsert produces on disk for a CoW table vs a MoR table." It blends timeline, file group, and table type into one probe.
Solution Using the three-primitive frame
# Pseudo-code — one upsert against record-key=42, value="X"
def upsert(table, key, value):
fileId = table.index.lookup_or_assign(key) # file group routing
if table.type == "COPY_ON_WRITE":
base = table.partition.read_base(fileId)
merged = base.upsert(key, value)
new_base = base.next_version()
write_parquet(new_base, merged)
table.timeline.emit_commit({fileId: new_base}) # commit instant
else: # MERGE_ON_READ
slice_n = table.partition.next_log_slice(fileId)
append_avro(slice_n, [(key, value, "upsert")])
table.timeline.emit_deltacommit({fileId: slice_n}) # deltacommit instant
Step-by-step trace.
| Step | CoW path | MoR path |
|---|---|---|
| 1 | Look up record-key=42 in index → fileId=f7a2 | same |
| 2 | Read base file f7a2_..._<prev>.parquet
|
(no read — append-only) |
| 3 | Apply upsert to in-memory rows | encode (key=42, value=X, op=upsert) as Avro |
| 4 | Write new base parquet f7a2_..._<now>.parquet
|
Append to log slice .f7a2_<baseCommit>.log.<N+1>
|
| 5 | Emit commit instant in .hoodie/
|
Emit deltacommit instant in .hoodie/
|
| 6 | Old base remains until cleaner GCs it | Base remains untouched; compaction will merge later |
Output:
| Aspect | CoW | MoR |
|---|---|---|
| Instant action | commit |
deltacommit |
| Files produced | 1 new base parquet | 1 new log slice (avro) |
| Read latency | low (read one parquet) | higher (read base + merge logs) |
| Write amplification | high (rewrite whole file group) | low (append only) |
| Cleanup actor | cleaner | compaction then cleaner |
Why this works — concept by concept:
- File group routing — every record key is hashed to exactly one fileId in its partition. Both table types use the same routing — the difference is what happens after the route is decided.
- Commit vs deltacommit — the two action names are a deliberate timeline distinction so that downstream tooling can tell at a glance whether log files were touched.
-
Snapshot vs read-optimised views — CoW exposes only a snapshot view (the latest base). MoR exposes both a snapshot view (base + logs merged) and a read-optimised view (base only, stale by
compaction interval). - Write amplification trade-off — CoW pays full O(rows-in-file-group) write cost per upsert; MoR pays O(updated-rows) for the append, plus the deferred O(rows-in-file-group) at compaction time. Same total work, redistributed in time.
- Cost — CoW write: O(file_group_size) per commit. MoR write: O(delta) per deltacommit + O(file_group_size) per scheduled compaction. CoW read: O(scanned_rows). MoR snapshot read: O(scanned_rows) + O(log_slices × delta_rows).
SQL
Topic — aggregation
Aggregation problems (SQL)
2. Copy-on-Write — full rewrites, read-heavy workloads
hudi copy on write rewrites the touched file group's base parquet on every commit so that readers never merge — the price is write amplification
The mental model in one line: every CoW write reads the affected base file, merges the incoming records, and writes a brand-new base file version; readers always see a single parquet per file group and never run a merge at query time. Once you say "rewrite the file group on write, scan one parquet on read," the entire CoW operational story falls out.
CoW rules in five bullets.
- One file format on disk. Base parquet, full stop. No log files, no Avro, no row-format secondary store.
- Every commit produces a new base file version per touched file group. Old versions remain visible to in-flight readers until the cleaner removes them.
- Readers always do a snapshot read. No merge logic at read time. Vectorised parquet scan, columnar pruning, predicate push-down all behave like a vanilla parquet table.
-
No compaction action. There are no log files to merge into a base, so the
compactioninstant simply does not exist for CoW tables. - Cleaner is the only background job. It GCs old base versions according to the cleaner policy. Without it, your storage cost grows linearly with commit count.
When CoW is the right pick.
- Daily or hourly batch loads. Each commit touches few file groups but every read is hot. Write amplification is bounded by load frequency.
- Dashboard / BI tables. Sub-second scan latency matters more than write latency.
- Tables that downstream Presto / Trino / Athena query directly. Read-optimised views are first-class without any merge logic.
- Strict GDPR / right-to-be-forgotten deletes. A delete that rewrites the base file genuinely overwrites the rows — easier to certify than MoR, which may carry a delete record in a log slice for days.
Where CoW hurts.
- High-frequency upserts. Rewriting an entire file group on every micro-batch is wasteful when the average delta is a handful of rows.
- Streaming sinks. A Kafka → CoW Hudi sink committing every 30 seconds will spend most of its CPU rewriting parquet, not landing data.
- Big file groups. The bigger the average file group (in MB), the more rewrites cost per byte of new data. The ratio (delta-bytes / file-group-bytes) is the write amplification factor.
Common interview probes on CoW.
- "What happens to the old base file when you commit a CoW update?" — it remains on disk for in-flight readers; the cleaner removes it later.
- "Can two writers commit to a CoW table at the same time?" — only with OCC (optimistic concurrency control) configured, and they must touch disjoint file groups; otherwise one aborts.
- "How does delete behave in CoW?" — it triggers a rewrite of every file group that holds a target record, producing a new base file with the deleted rows physically absent.
- "What is the cost of a CoW upsert that updates one row?" — O(file_group_size) in bytes, because the whole base file is rewritten.
Worked example — upsert rewrites the file group
Detailed explanation.
- Goal. Show what happens on disk when a single update lands in a CoW table.
- Why this matters. Engineers regularly under-estimate CoW write cost because the logical update is one row. The physical cost is the size of the base file.
- The mechanic. Hudi reads the affected base parquet, applies the upsert in-memory, and writes a new parquet file with a fresh commit timestamp in its name.
Question. A CoW table has a single file group with a 256 MB base parquet. You upsert one row. What does the disk look like before vs after, and what is the write cost?
Input — file group state before.
| fileId | latest base | bytes |
|---|---|---|
| f7a2 | f7a2_..._20260613090000.parquet |
256 MB |
Code.
// Spark / Hudi CoW upsert — single row example
val df = Seq(("k1", "new_value", 200L))
.toDF("record_key", "value", "updated_at")
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.recordkey.field", "record_key")
.option("hoodie.datasource.write.precombine.field", "updated_at")
.mode("append")
.save("s3://lake/orders/")
Step-by-step explanation.
- The driver looks up
record_key=k1in the index and finds it routes to fileIdf7a2. - The executor reads the entire 256 MB base file into memory (or streams it row-group by row-group).
- It applies the upsert: replace the existing row for
k1(or insert if absent). - It writes a new parquet file
f7a2_..._20260613100000.parquetcontaining all rows — the unchanged ones plus the upserted one. The file is roughly 256 MB plus or minus the delta. - Hudi emits a
commitinstant referencing the new base file. The old file..._090000.parquetis still readable until the cleaner removes it.
Output — disk state after.
| fileId | base files now |
|---|---|
| f7a2 |
..._090000.parquet (256 MB), ..._100000.parquet (~256 MB) |
| Metric | Value |
|---|---|
| Bytes read | 256 MB |
| Bytes written | 256 MB |
| Storage (until clean) | 512 MB |
| Write amplification | 256 MB per row upserted |
Rule of thumb. For a CoW table, the minimum cost of any commit is the size of the smallest touched file group. Plan your commit cadence and your batch sizes so that write amplification stays acceptable — a useful rule is "commit when your delta is at least 5% of the file group size."
Worked example — read path is one parquet per file group
Detailed explanation.
- Goal. Show that a CoW read is identical in shape to reading a plain parquet table.
- Why this matters. Most query latency wins from migrating off CoW are illusory — CoW reads are already as fast as parquet reads. The real read wins come from MoR's read-optimised view, not from CoW.
- The mechanic. Hudi resolves the latest commit instant, lists the latest base file per file group, and hands the file list to the scan operator.
Question. Given a CoW table with three file groups, what files does a SELECT * read at commit timestamp 1000?
Input — file groups at commit 1000.
| fileId | latest base file |
|---|---|
| f7a2 | f7a2_..._20260613100000.parquet |
| 9b1c | 9b1c_..._20260613090000.parquet |
| a3d4 | a3d4_..._20260613100000.parquet |
Code.
-- A vanilla scan via Spark SQL or Trino
SELECT *
FROM hudi.orders
WHERE event_date >= DATE '2026-06-01';
Step-by-step explanation.
- The query engine asks Hudi for the file list at the latest completed commit (= 1000).
- Hudi returns exactly one base parquet per file group — the most recent version ≤ commit 1000.
- The scan operator reads only these three files, applies partition pruning and parquet predicate push-down, and returns the matching rows.
- There is no merge step; there is no log replay; there is no per-row deduplication. The read is identical in cost to reading three plain parquet files.
Output.
| File scanned | Rows scanned | Why |
|---|---|---|
f7a2_..._100000.parquet |
all rows in f7a2 | latest version at commit 1000 |
9b1c_..._090000.parquet |
all rows in 9b1c | 9b1c had no commit at 1000 |
a3d4_..._100000.parquet |
all rows in a3d4 | latest version at commit 1000 |
Rule of thumb. A CoW read is "list latest base per file group, scan." If Presto / Trino / Athena performance looks slower than expected, the problem is almost always file-group sizing or partition layout — not Hudi.
Worked example — delete propagation in CoW
Detailed explanation.
- Goal. Show how a delete physically removes the row.
- Why this matters. Compliance teams need certainty that a "delete user 42" call results in user 42's row physically absent from disk, not merely logically suppressed. CoW satisfies this contract more directly than MoR.
- The mechanic. A delete in CoW behaves like an upsert with a tombstone: the affected file group is rewritten without the targeted rows.
Question. A CoW orders table has a user_id=42 row in the file group f7a2_..._090000.parquet (256 MB total). The team issues a delete-by-key. Show the disk state before and after.
Input.
| Property | Value |
|---|---|
| operation | delete |
| record_key | user_id=42 |
| target fileId | f7a2 |
| size before | 256 MB |
Code.
val deletes = Seq("42").toDF("user_id")
deletes.write.format("hudi")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.operation", "delete")
.option("hoodie.datasource.write.recordkey.field", "user_id")
.mode("append")
.save("s3://lake/orders/")
Step-by-step explanation.
- Driver routes
user_id=42to fileIdf7a2. - Executor reads the base file, drops the row matching
user_id=42, and writes a new base file..._110000.parquetwithout it. - Commit metadata records the operation as
delete. Hudi's incremental query API can later surface the removed row to subscribers, but the data itself is physically gone in the new base. - The old base file
..._090000.parquetstill containsuser_id=42until the cleaner removes that version, at which point the data is irretrievable.
Output.
| Stage | Disk holds row? | Reader sees row? |
|---|---|---|
| Before delete | yes (in ..._090000.parquet) |
yes |
| After delete commit | yes (still in ..._090000.parquet) |
no (reader uses ..._110000.parquet) |
| After cleaner | no | no |
Rule of thumb. Compliance-grade deletes require both the commit and the next cleaner run. The "retention window" of your cleaner policy is the effective delete latency from a GDPR / right-to-be-forgotten standpoint.
Interview question on CoW operations
A senior interviewer might frame this as: "You inherit a CoW orders table that commits hourly. The last hour added 200 net new rows. The base files are 256 MB each, and the table has 1000 file groups. Estimate the hourly write cost and how you'd reduce it." It blends file-group sizing, commit cadence, and write amplification.
Solution Using a write-amplification computation
# Estimate hourly bytes written for a CoW table
file_groups_total = 1000
file_groups_touched = 50 # observed from commit metadata
avg_base_file_mb = 256
delta_rows = 200
hours_per_day = 24
bytes_per_commit_mb = file_groups_touched * avg_base_file_mb
bytes_per_day_gb = bytes_per_commit_mb * hours_per_day / 1024
write_amp_per_row = bytes_per_commit_mb * 1024 * 1024 / delta_rows
Step-by-step trace.
| Step | Calculation | Value |
|---|---|---|
| 1 | bytes per commit | 50 * 256 = 12 800 MB ≈ 12.5 GB |
| 2 | bytes per day | 12.5 GB * 24 = 300 GB / day |
| 3 | rows per day | 200 * 24 = 4 800 rows |
| 4 | write amplification per row | 12.5 GB / 200 ≈ 64 MB / row |
| 5 | mitigation: increase commit interval to 4 hours | bytes/day = 12.5 * 6 = 75 GB; rows/commit = 800 |
| 6 | mitigation: switch to MoR | bytes/commit ≈ delta size; compaction every 24 commits |
The trace shows that committing every hour for 200 rows is rewriting 12.5 GB of parquet — a write amplification of 64 MB per row. Either coalesce commits into a longer window or switch to MoR.
Output:
| Strategy | Bytes / day | Read latency change | Notes |
|---|---|---|---|
| Hourly CoW | 300 GB | none | wasteful |
| 4-hour CoW | 75 GB | none | simplest fix |
| MoR + hourly deltacommit | ~2 GB | +30% read | needs compaction policy |
| MoR + 6-hour compaction | ~2 GB writes + 12.5 GB / compaction | varies | balanced |
Why this works — concept by concept:
- Write amplification = file-group-size / delta-size — the fundamental CoW knob. Bigger file groups amplify more; bigger deltas amplify less.
- Commit cadence is a cost lever — every doubled interval roughly halves write cost (assuming delta accumulates linearly). The trade-off is data freshness.
- Touched file groups are a cardinality signal — if 50 of 1000 file groups are touched, your record keys cluster well. If 950 of 1000 are touched, your keys are uniform and CoW is paying its worst case.
- MoR is the structural fix — it changes the cost model from O(file_group_size) per commit to O(delta) per commit, paid back at compaction time.
- Cost — CoW write per commit: O(touched_file_groups × file_group_size). CoW read per scan: O(scanned_rows) with vectorised parquet — identical to a vanilla parquet table.
SQL
Topic — data aggregation
Data aggregation problems (SQL)
3. Merge-on-Read — base + delta logs, write-heavy workloads
hudi merge on read writes changes to row-based log files and defers the parquet rewrite to compaction — fast writes, costlier reads
The mental model in one line: every MoR write appends a row-based Avro log slice to the affected file group; readers either scan just the base (fast but stale) or scan the base plus replay every log slice on top (fresh but slower); compaction periodically folds the logs back into a new base file. Once you say "append on write, merge at read, compact in the background," the MoR operational story falls out.
MoR rules in five bullets.
- Two file formats on disk. Base parquet (columnar) plus delta log files (Avro, row-based). Both carry the same schema.
- Every upsert is a deltacommit. Hudi appends a new log slice referencing the current base file. The base file is not rewritten.
- Two read views. The read-optimised view reads only the base files (very fast, but stale by up to one compaction interval). The snapshot view reads the base plus merges every log slice on top (fresh, but slower).
- Compaction is a separate background job. It reads the base + all logs, applies them in order, and writes a new base parquet, then declares the logs obsolete.
- Cleaner still runs. After compaction, the old base and the merged logs are eligible for GC under the cleaner policy.
When MoR is the right pick.
- High-frequency upserts / CDC. A Debezium → Kafka → Hudi sink benefits massively from append-only writes.
- Streaming sinks with relaxed read freshness. Dashboards that tolerate 5–30 minutes of staleness can serve from the read-optimised view at parquet speeds.
- Tables with skewed update patterns. If most updates target a small fraction of records, MoR keeps log files small and compaction cheap.
- Tables that need both real-time consumers and batch consumers. Real-time readers take the snapshot view; batch readers take the read-optimised view. Same data, two cost profiles.
Where MoR hurts.
- Read-heavy dashboards that need fresh data. Every snapshot read pays the log-merge cost.
- Tables with very few writes but many reads. The base file is fresh enough that MoR's machinery is unnecessary overhead.
- Engines that only support read-optimised view. Some older Presto / Trino versions can only read the base, hiding recent deltacommits entirely from analytics.
Common interview probes on MoR.
- "What is the difference between the read-optimised and snapshot views?" — read-optimised scans the base only; snapshot replays the logs on top.
- "How are log files encoded?" — Avro row-based, organised into log blocks with header metadata (commit time, schema, block type).
- "What is a rollback block?" — a special log-block type emitted when an inflight deltacommit is rolled back; it instructs the reader to ignore prior blocks from that commit.
- "What governs compaction timing?" — the compaction policy:
NUM_COMMITS(compact every N deltacommits),TIME_ELAPSED(after T minutes),NUM_COMMITS_AFTER_LAST_REQUEST(since last compaction was scheduled), or a custom strategy.
Worked example — write fast (append to log)
Detailed explanation.
- Goal. Show what happens on disk when an upsert lands in a MoR table.
- Why this matters. Engineers picking MoR want to know the per-write cost is bounded by the delta size, not the base file size — that's the whole point.
-
The mechanic. The executor encodes the upsert records as Avro and appends them to a log file that targets the current base file via its
baseCommittoken.
Question. A MoR table has one file group with a 256 MB base parquet at commit 0900. You deltacommit 200 upserts at 1000. What lands on disk?
Input — state before.
| fileId | base | logs |
|---|---|---|
| f7a2 |
f7a2_..._090000.parquet (256 MB) |
(none) |
Code.
val df = (1 to 200).toDF("record_key").withColumn("value", lit("X"))
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.recordkey.field", "record_key")
.option("hoodie.datasource.write.precombine.field", "updated_at")
.option("hoodie.compact.inline", "false") // async compaction
.mode("append")
.save("s3://lake/orders/")
Step-by-step explanation.
- The 200 upserts hash to fileId
f7a2. - The executor encodes them as one Avro log block with a header (commit time = 1000, baseCommit = 0900, schema hash, block type =
DATA). - It appends the block to
.f7a2_20260613090000.log.1_...— a new log slice file targeting base commit 0900. - Hudi emits a
deltacommitinstant. The base file is untouched. - A subsequent upsert at commit 1100 would append a new log block, either to a new log slice file (
...log.2) or to the same file depending on configuration.
Output — state after.
| fileId | base | logs |
|---|---|---|
| f7a2 |
f7a2_..._090000.parquet (256 MB) |
.f7a2_...090000.log.1 (~few KB) |
| Metric | Value |
|---|---|
| Bytes written | ~few KB (just the delta) |
| Bytes read | 0 |
| Write amplification | ≈ 1× (no rewrite) |
Rule of thumb. MoR collapses write amplification from O(file_group_size) to O(delta_size). The cost is moved to compaction, where it is paid once per N deltacommits.
Worked example — read paths: read-optimised vs snapshot
Detailed explanation.
- Goal. Show the two read views and the cost difference.
- Why this matters. Operators frequently configure the wrong view for a workload — dashboards that demand freshness get pointed at the read-optimised view (stale) and batch backfills that don't need freshness pay the snapshot cost.
-
The mechanic. The query specifies the view via a config (
hoodie.datasource.query.type=read_optimizedorsnapshot). Hudi assembles the file list accordingly.
Question. A MoR file group has a 256 MB base at commit 0900 and three log slices over it (at commits 1000, 1100, 1200). Compare the file lists and CPU profiles for read-optimised vs snapshot scans.
Input — file group state.
| Component | Source |
|---|---|
| base |
f7a2_..._090000.parquet (256 MB) |
| log slice 1 |
.f7a2_...090000.log.1 (5 MB Avro, 1000 deltas) |
| log slice 2 |
.f7a2_...090000.log.2 (3 MB Avro, 600 deltas) |
| log slice 3 |
.f7a2_...090000.log.3 (4 MB Avro, 800 deltas) |
Code.
// read-optimised view (fastest, stale by up to one compaction interval)
val ro = spark.read.format("hudi")
.option("hoodie.datasource.query.type", "read_optimized")
.load("s3://lake/orders/")
// snapshot view (fresh, more CPU)
val snap = spark.read.format("hudi")
.option("hoodie.datasource.query.type", "snapshot")
.load("s3://lake/orders/")
Step-by-step explanation.
- The read-optimised scan lists only the latest base file per file group. It ignores log slices entirely. Cost = scan one 256 MB parquet.
- The snapshot scan lists the base plus every log slice that targets it. It loads the base, then replays the logs in commit order, applying each delta to the in-memory dataset (upserts and deletes both).
- The deduplication merge is keyed on the record key; the precombine field decides which version wins when a key appears multiple times.
- After the merge, the snapshot scan emits the consolidated rows to the downstream operator. The cost = base scan + Avro decode of all logs + merge.
Output — cost comparison.
| View | Files read | CPU | Freshness |
|---|---|---|---|
| read-optimised | 1 (base 256 MB) | low (vectorised parquet) | stale to 0900 |
| snapshot | 1 + 3 (base + 12 MB logs) | high (Avro decode + merge) | fresh to 1200 |
Rule of thumb. Default dashboards to read-optimised. Switch to snapshot only when "freshness within minutes" is in the SLA, and budget the extra CPU.
Worked example — log block headers and rollback blocks
Detailed explanation.
- Goal. Show that log files are not free-form Avro — they have a block-header schema that allows mid-file rollbacks.
- Why this matters. A failed deltacommit must be erased from the reader's point of view without rewriting the log file. The rollback block accomplishes this.
-
The mechanic. A log block has a 6-byte magic header, version, type, length, content, and footer. The block type can be
DATA,DELETE,COMMAND, orCORRUPT.
Question. A deltacommit at 1100 partially wrote a log block and then crashed. How does the rollback at 1110 invalidate the partial block without rewriting the file?
Input — log file body.
| Block | Type | Commit | Status |
|---|---|---|---|
| 1 | DATA | 1000 | OK |
| 2 | DATA | 1100 | partially written (CORRUPT magic at end) |
| 3 | COMMAND | 1110 | rollback target=1100 |
Code.
# Pseudo-code — Hudi's LogFileReader scanning the log
def replay_blocks(file):
rolled_back = set()
for block in file.read_blocks():
if block.type == "COMMAND" and block.subtype == "ROLLBACK":
rolled_back.add(block.target_commit)
elif block.type == "DATA":
if block.commit_time in rolled_back:
continue # silently skip
yield block.records
elif block.type == "CORRUPT":
continue # malformed tail
Step-by-step explanation.
- The reader iterates blocks left-to-right. It first encounters the DATA block from commit 1000 and surfaces its records.
- It encounters the partially-written block from commit 1100. The block magic is incomplete, so the reader marks the block as CORRUPT and skips it.
- It encounters the COMMAND/ROLLBACK block from commit 1110. The block instructs the reader to also skip any DATA blocks belonging to commit 1100 — protection in case the partial block had managed to write its magic.
- The reader produces only the records from commit 1000. The rollback semantics are preserved without modifying the log file.
Output — reader semantics.
| Commit | Block type | Reader treats as |
|---|---|---|
| 1000 | DATA | live |
| 1100 | DATA (corrupt) | dropped |
| 1110 | COMMAND/ROLLBACK | metadata only |
Rule of thumb. Hudi's log format makes rollbacks O(1) — append a COMMAND block. This is why MoR can support concurrent writers with optimistic concurrency control without expensive rewriting.
Interview question on MoR architecture
A senior interviewer might frame this as: "Walk me through a single CDC event lifecycle in a MoR table from ingest to query. Include the deltacommit, the read-optimised view, the snapshot view, and the eventual compaction."
Solution Using a CDC lifecycle trace
T+0 CDC event: UPDATE orders SET status='paid' WHERE id=42
T+0.1s Flink sink hashes id=42 → fileId=f7a2
T+0.2s Encodes (id=42, status='paid', op='U', ts=T) as Avro
T+0.3s Appends to .f7a2_<baseCommit>.log.<N>
T+0.4s Emits deltacommit instant 20260613T1234567
T+1m Read-optimised dashboard scan: misses the update
T+1m Snapshot dashboard scan: surfaces the update
T+30m Compaction job triggered (NUM_COMMITS=30 reached)
T+31m Compaction reads base + all log slices, writes new base
T+31m Emits commit (compaction) instant; logs marked obsolete
T+32m Cleaner GCs old base file and merged log slices
T+32m Read-optimised and snapshot now both return id=42 paid
Step-by-step trace.
| Time | Actor | Disk state on fileId f7a2 |
|---|---|---|
| T+0 | (before) | base v090000.parquet only |
| T+0.4s | sink | base v090000.parquet + log slice N (with update) |
| T+31m | compactor | base v0900.parquet + base v1234.parquet + obsolete logs |
| T+32m | cleaner | base v1234.parquet only |
The trace shows how a single update is "live in the snapshot view" instantly, "live in the read-optimised view" only after compaction, and "physically realised in a fresh base" only after compaction completes.
Output:
| View at T+5m | Sees update? | CPU |
|---|---|---|
| read-optimised | no | low |
| snapshot | yes | high (log merge) |
| snapshot via Flink incremental | yes (sub-second) | engine-managed |
| read-optimised after T+31m | yes | low |
Why this works — concept by concept:
- Deltacommit decouples ingest from rewrite — the writer's job ends at "append the log slice and emit the instant." The compactor is responsible for the eventual parquet rewrite.
- Read-optimised vs snapshot is a freshness lever — same table, two query types, two cost profiles. The platform team picks per consumer.
-
Compaction triggers — typically
NUM_COMMITS(compact every N deltacommits) orTIME_ELAPSED. A custom strategy can also consider log size or skew. - Multiple writers via OCC — Hudi's optimistic concurrency control can serialise concurrent writers on the timeline. Conflicts are detected at file-group granularity.
- Cost — write: O(delta_size) per deltacommit. Compaction: O(base_size + sum(log_sizes)) per compaction. Read-optimised: O(scanned_rows). Snapshot: O(scanned_rows + log_sizes_to_merge).
SQL
Topic — streaming
Streaming problems (SQL)
4. Compaction and cleaning — inline vs async scheduler
hudi compaction collapses log slices into a new base file; the cleaner GCs obsolete versions — together they bound storage cost and read latency
The mental model in one line: compaction folds delta log slices back into the base parquet to keep MoR read latency bounded; the cleaner GCs file versions older than the retention window to keep storage cost bounded. The two are separate actions on the timeline — different triggers, different policies, different operational levers.
Compaction in detail.
-
Action name.
compaction. Likecommitanddeltacommit, it has its own REQUESTED → INFLIGHT → COMPLETED state machine. -
What it does. For each file group with at least one log slice, the compactor reads
base_v<n>.parquet + all log slices on top, applies the deltas in commit order, and writesbase_v<n+1>.parquet. The logs are then declared obsolete. - Trigger modes. Inline (after every Nth deltacommit, blocks the writer until compaction completes) or async (a separate process runs continuously, the writer is unblocked).
-
Strategies.
NUM_COMMITS(the default — compact every N deltacommits per file group),TIME_ELAPSED(after T minutes since last compaction),NUM_COMMITS_AFTER_LAST_REQUEST(since the last scheduled compaction, useful when async). - Concurrency. Compaction can run alongside writers because it operates on the current base + already-committed log slices. New deltacommits land in new log slices that the compactor will pick up next round.
Cleaning in detail.
-
Action name.
clean. Same state machine as the other actions. - What it does. Deletes obsolete file versions per the cleaner policy. The policy decides "what is obsolete?"
-
Policies.
-
KEEP_LATEST_COMMITS— keep file versions referenced by the latest N commits. This is the recommended policy for incremental query consumers (their last-read timestamp must fall in the retention window). -
KEEP_LATEST_FILE_VERSIONS— keep the latest N file versions per file group, regardless of commit timestamp. Simpler but does not align with incremental query lag. -
KEEP_LATEST_BY_HOURS— keep all file versions written in the lastThours.
-
- Tension. A long retention window costs storage but supports long-running readers, incremental subscribers, and time-travel queries.
Inline vs async — the operational lever.
- Inline compaction. Simpler to operate. The writer schedules a deltacommit, hits the threshold, runs compaction synchronously, then returns. Latency of any given write becomes bimodal (cheap when not triggering compaction, expensive when triggering).
- Async compaction. Better for steady-state ingest. The writer never blocks; a separate process (a Spark structured streaming job or a Hudi services container) polls the timeline and runs compaction when conditions are met. Operational cost: one more process to monitor.
- Hybrid. "Schedule inline, execute async" — the writer marks compaction as REQUESTED but does not run it; the async process picks up the requested instant and runs it. Common pattern for Flink sinks.
Common interview probes on compaction.
- "Why must compaction never run concurrently with itself on the same file group?" — because the second compactor would read a stale base and produce a wrong merge. Hudi enforces single-compaction-per-file-group via the timeline.
- "What is a scheduled compaction vs an executed compaction?" — the REQUESTED instant is the schedule decision; the COMPLETED instant is execution. The split lets async services do the work.
- "How do you size a compaction interval?" — pick the largest
NUM_COMMITSthat keeps snapshot read latency tolerable. Smaller N keeps reads fresh but costs more compaction CPU. - "What's the relationship between the cleaner's
KEEP_LATEST_COMMITSand an incremental query consumer?" — the cleaner must retain enough commits that every consumer's "last seen" timestamp is still in the retention window; otherwise the consumer cannot resume.
Worked example — inline compaction every N deltacommits
Detailed explanation.
- Goal. Configure inline compaction to run every 10 deltacommits.
- Why this matters. It is the simplest MoR setup to operate — no separate services to monitor.
-
The mechanic. Set
hoodie.compact.inline=trueandhoodie.compact.inline.max.delta.commits=10. The 10th deltacommit invokes the compactor synchronously and only returns when it completes.
Question. A MoR table is configured with inline compaction every 10 deltacommits. What happens at deltacommit 10, and what is the latency profile of the 10th vs 11th commit?
Input — config.
| Key | Value |
|---|---|
hoodie.compact.inline |
true |
hoodie.compact.inline.max.delta.commits |
10 |
| baseline deltacommit latency | ~2 s |
| baseline compaction duration | ~60 s |
Code.
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "10")
.option("hoodie.compact.schedule.strategy", "NUM_COMMITS")
.mode("append").save("s3://lake/orders/")
Step-by-step explanation.
- Deltacommits 1 through 9 each land an Avro log slice in ~2 s and return.
- On the 10th deltacommit, after the log slice is written and the deltacommit instant is COMPLETED, Hudi schedules and executes a compaction instant synchronously.
- The compactor reads the base + 10 log slices, writes a new base parquet, and emits the compaction COMPLETED instant. This takes ~60 s.
- The 10th write call returns ~62 s after invocation (2 s deltacommit + 60 s compaction). Every other call returns in ~2 s.
- After compaction, the 11th deltacommit again starts fresh against the new base, accumulating until the next 10-count threshold.
Output — latency observed at the writer.
| Commit N | Latency | Notes |
|---|---|---|
| 1 | 2 s | normal |
| 5 | 2 s | normal |
| 10 | 62 s | inline compaction fires |
| 11 | 2 s | normal |
| 20 | 62 s | next inline compaction |
Rule of thumb. Inline compaction is great for batch sinks where bimodal write latency is acceptable. For real-time / streaming sinks, switch to async.
Worked example — async compaction scheduler
Detailed explanation.
- Goal. Decouple the compactor from the writer so the streaming sink never blocks.
- Why this matters. A streaming Flink → Hudi sink that hits inline compaction would see end-to-end latency spike from 2 s to 60+ s every N commits. Async eliminates that.
- The mechanic. The writer only schedules compaction (writes a REQUESTED instant). A separate Spark job or HoodieCompactor service polls for requested-but-not-completed compactions and executes them.
Question. Configure the Flink sink to schedule compactions but not execute them. Show what the compactor service looks like.
Input — Flink sink config.
| Key | Value |
|---|---|
compaction.async.enabled |
false |
compaction.schedule.enabled |
true |
compaction.delta_commits |
5 |
Code.
// Standalone compactor service (Spark)
import org.apache.hudi.utilities.HoodieCompactor
val cfg = new HoodieCompactor.Config()
cfg.basePath = "s3://lake/orders/"
cfg.runningMode = "execute" // execute pre-scheduled compactions
cfg.parallelism = 4
HoodieCompactor.run(cfg, spark)
Step-by-step explanation.
- The Flink sink writes deltacommits as usual. Every 5th deltacommit, it also writes a
compaction.requestedinstant to the timeline. - The Flink sink does not execute the compaction — it returns immediately. Latency stays at ~2 s per commit.
- A separate Spark service runs continuously (or on a cron schedule). On each invocation it lists the timeline for REQUESTED-but-not-COMPLETED compactions, picks one, and executes it.
- Execution writes a new base parquet per affected file group and emits the compaction COMPLETED instant.
- The next reader sees the merged base; old log slices are eligible for cleaning.
Output — service architecture.
| Process | Responsibility | Runs |
|---|---|---|
| Flink sink | deltacommit + schedule compaction | continuous |
| Spark compactor service | execute scheduled compactions | scheduled / continuous |
| Spark cleaner service | GC obsolete versions | scheduled |
Rule of thumb. Async compaction is the production-grade default for streaming MoR tables. Pay the "extra service to monitor" cost; gain consistent write latency.
Worked example — cleaner policies
Detailed explanation.
- Goal. Pick a cleaner policy that bounds storage cost while keeping incremental query consumers safe.
- Why this matters. A cleaner policy that is too aggressive will silently break incremental consumers ("my last seen commit is no longer in the retention window — error"). A policy that is too generous will balloon storage.
- The mechanic. Each policy uses a single threshold to decide what file versions are obsolete and can be GC'd.
Question. Compare KEEP_LATEST_COMMITS=10 vs KEEP_LATEST_FILE_VERSIONS=3 for a CoW table with hourly commits and a long-running incremental subscriber.
Input — workload.
| Property | Value |
|---|---|
| table type | CoW |
| commit cadence | hourly |
| incremental subscriber lag SLA | up to 6 hours |
| average file size | 256 MB per file group |
| file groups | 1 000 |
Code.
# Policy comparison
KEEP_LATEST_COMMITS=10
→ retain ~10 file versions per file group (assuming each touched ~uniformly)
→ consumer with 6h lag: OK (lag < 10h retention)
KEEP_LATEST_FILE_VERSIONS=3
→ retain exactly 3 file versions per file group
→ consumer with 6h lag: at risk if a hot file group is rewritten >3 times
Step-by-step explanation.
- With
KEEP_LATEST_COMMITS=10, the cleaner keeps file versions referenced by the latest 10 commits per the timeline. A 6 h-lag consumer is safe because at hourly commit cadence, 10 commits is 10 hours of headroom. - With
KEEP_LATEST_FILE_VERSIONS=3, the cleaner keeps only the latest 3 file versions per file group. If a hot file group is rewritten 4 or more times in 6 hours, the consumer's expected version is gone — and incremental query fails. - The storage cost differs.
KEEP_LATEST_COMMITS=10retains 10× file groups of overhead;KEEP_LATEST_FILE_VERSIONS=3retains 3× regardless of commit cadence. - The right policy depends on whether the workload is "uniformly touched file groups" (in which case the two policies converge) or "skewed file groups" (in which case
KEEP_LATEST_COMMITSis safer).
Output — recommendation.
| Workload | Policy | Why |
|---|---|---|
| Uniform churn + steady consumer lag | KEEP_LATEST_COMMITS |
aligns retention with consumer lag |
| Skewed hot file groups | KEEP_LATEST_COMMITS |
every commit advances retention uniformly |
| Quiet table, bounded storage | KEEP_LATEST_FILE_VERSIONS |
predictable storage |
| Time-travel SLA in hours | KEEP_LATEST_BY_HOURS |
direct mapping |
Rule of thumb. Default to KEEP_LATEST_COMMITS and set the threshold to the maximum incremental consumer lag SLA plus a safety margin (e.g. SLA × 2). Only switch to KEEP_LATEST_FILE_VERSIONS if storage cost is dominant and you have no incremental consumers.
Interview question on compaction + cleaning
A senior interviewer might frame this as: "You operate a MoR table with hourly deltacommits and NUM_COMMITS=24 inline compaction. Reads are getting slow at 22-23 commits into the cycle, then snap back after compaction. Design a fix that smooths read latency."
Solution Using async compaction with a tighter schedule strategy
// Writer config — schedule only
val writerOptions = Map(
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
"hoodie.compact.inline" -> "false",
"hoodie.compact.schedule.enabled" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "6", // schedule every 6
"hoodie.cleaner.policy" -> "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained" -> "48"
)
// Separate async compactor service runs every 5 minutes
// HoodieCompactor.run(execute-mode) on a cron
Step-by-step trace.
| Time | Writer action | Compactor action | Read latency |
|---|---|---|---|
| T+0 | deltacommit 1 | (idle) | base only |
| T+6h | deltacommit 6 + schedule_compaction | (idle) | base + 6 logs |
| T+6h05 | (deltacommit 7) | execute_compaction → new base | snap back to ~ base |
| T+12h | deltacommit 12 + schedule_compaction | (idle) | base + 6 logs |
| T+12h05 | (deltacommit 13) | execute → new base | base only again |
The trace shows that scheduling every 6 commits (rather than 24) caps the log-merge cost on the snapshot read at ~6 log slices and never lets the read view degrade further. The writer never blocks.
Output:
| Metric | Old (inline every 24) | New (async every 6) |
|---|---|---|
| Peak log slices before compaction | 23 | 6 |
| Peak snapshot read CPU | high | bounded |
| Writer p99 latency | bimodal (2 s / 90 s) | flat (~2 s) |
| Compactor service required | no | yes |
Why this works — concept by concept:
- Tighter compaction interval caps snapshot read cost — the worst-case log-merge work scales with the deltacommit-to-compaction ratio. Smaller ratio = lower peak read CPU.
- Async eliminates writer blocking — the writer's job ends at "emit deltacommit + schedule compaction." A separate process executes.
-
Cleaner retains enough commits for consumers —
KEEP_LATEST_COMMITS=48gives any incremental consumer up to 48 commits of lag headroom. - Schedule + execute split lets compaction scale horizontally — multiple compactor instances can pick up scheduled instants in parallel (with single-compactor-per-file-group enforcement).
- Cost — write per commit: O(delta). Compaction per scheduled run: O(base + sum(log_slices)). Cleaner per run: O(obsolete_files). Snapshot read: O(rows + ≤6 log slices) instead of O(rows + 23 log slices).
SQL
Topic — etl
ETL pipeline problems (SQL)
5. Picking the type — read latency vs write rate, multi-engine read
The Hudi pick reduces to two questions — how fast must reads be, and how often do writes arrive — and one constraint: which engines query the table
The mental model in one line: MoR for streaming sinks and high-frequency upserts, CoW for daily batch and read-heavy dashboards; the multi-engine read matrix (Spark / Flink / Presto / Trino) decides whether MoR's snapshot view is even available to your consumers. Once you say "two axes, one constraint," the decision falls out.
The decision matrix in one table.
| Read latency | Low write rate | High write rate |
|---|---|---|
| Tight (sub-second) | CoW — read one parquet; cheap | MoR + frequent compaction — snapshot view for the hot rows, RO for the rest |
| Relaxed (minutes) | CoW — simplest setup, fine for warehouse mirrors | MoR + async compaction — streaming sink, CDC, IoT |
The multi-engine read matrix.
| Engine | CoW snapshot | MoR snapshot | MoR read-optimised | Notes |
|---|---|---|---|---|
| Spark | yes | yes | yes | reference implementation; all views supported |
| Flink | yes | yes (also streaming read) | yes | first-class streaming write and read |
| Presto | yes | yes (Hudi connector, recent versions) | yes | older versions: RO only |
| Trino | yes | yes (Hudi connector ≥ 360) | yes | sub-second on the RO view |
| Hive | yes | yes | yes | via InputFormat shims |
| Athena | yes | yes | yes | recent Athena engine ≥ v3 |
Why the matrix matters.
- A table that must be readable by every engine in the matrix should default to CoW. Every engine reads CoW snapshot at parquet speed.
- A MoR table can still be read by every engine in the read-optimised view, but the snapshot view sometimes lags behind the deltacommits. Confirm your engine version supports the snapshot before committing.
- Flink is the only engine in the table that also writes MoR with first-class streaming semantics. If your pipeline is Flink-centric, MoR is a natural pick.
Common interview probes on the pick.
- "When would you pick CoW for a streaming sink?" — almost never, unless the read SLA is so tight (sub-100 ms parquet scan) that any log-merge cost is unacceptable. Even then, MoR + frequent compaction usually wins.
- "When would you pick MoR for a daily batch table?" — almost never. The write-amplification problem CoW has does not appear in daily batch; the extra MoR complexity is unjustified.
- "Can you migrate from CoW to MoR (or back) without rewriting the table?" — not directly. You write new data into a fresh table with the new type and switch readers over.
- "Does Presto see deltacommits in real time?" — only with the snapshot connector. The read-optimised connector waits for the next compaction.
The 2026 reality.
- Streaming sinks are 80% MoR. CDC, Kafka, IoT, change-data-capture all default to MoR + async compaction.
- Warehouse mirrors / golden tables are 90% CoW. Built once daily, read by every dashboard, no need for log-merge complexity.
-
Hudi vs Iceberg vs Delta — Hudi's MoR is its differentiator vs Iceberg (which only has CoW-equivalent behaviour for the user) and Delta (which has MERGE INTO but not the explicit MoR/CoW split). Iceberg's
MORsemantics in 2026 are catching up but conceptually different (positional deletes + equality deletes).
Worked example — daily batch read (CoW)
Detailed explanation.
- Goal. Configure a CoW table for a nightly warehouse mirror read by 50 BI dashboards.
- Why this matters. Every dashboard query should run at parquet-scan speed; the writer runs once at 02:00 and rewrites whatever changed.
- The mechanic. CoW with a daily commit cadence and a wide cleaner retention so that any long-running BI session does not get its snapshot GC'd mid-query.
Question. Pick the table type and key configs for a nightly orders mirror serving 50 dashboards.
Input — workload.
| Property | Value |
|---|---|
| commits per day | 1 (at 02:00) |
| reads per day | 50 dashboards × ~10 queries each |
| read SLA | < 5 s p99 |
| writer | Spark batch |
Code.
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.operation", "bulk_insert") // first load
// .option("hoodie.datasource.write.operation", "upsert") // incremental
.option("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS")
.option("hoodie.cleaner.commits.retained", "7") // 1 week
.option("hoodie.parquet.small.file.limit", "104857600") // 100 MB
.mode("append").save("s3://lake/orders/")
Step-by-step explanation.
- CoW is the right pick because writes are infrequent (1/day) and reads are very frequent. No log-merge logic ever runs.
-
KEEP_LATEST_COMMITS=7lets a Friday morning BI user keep reading the Monday snapshot if they happened to start the session early in the week. -
hoodie.parquet.small.file.limit=100MBtriggers Hudi's small-file handling — incoming inserts merge with under-sized existing files so the file-group sizing stays healthy. - BI queries always read the latest base file per file group — a vanilla parquet scan that Presto / Trino / Athena handle natively.
Output — operational profile.
| Metric | Value |
|---|---|
| Storage overhead vs raw parquet | ~1× (just the .hoodie/ metadata) |
| BI query latency | parquet-native |
| Writer runtime | bounded by touched_file_groups × file_group_size
|
| Compaction services needed | none |
Rule of thumb. Daily batch + read-heavy = CoW with a long cleaner window. The setup is boring and that is the point.
Worked example — streaming Kafka → MoR sink
Detailed explanation.
- Goal. Configure a MoR table fed by Flink reading Kafka at 50 k events/s.
- Why this matters. A CoW table at this write rate would rewrite parquet faster than Spark could keep up. MoR with async compaction is the textbook fit.
- The mechanic. Flink writes deltacommits every checkpoint interval; a separate Spark service runs async compaction.
Question. Pick the table type, write operation, and compaction strategy for a Kafka sink at 50 k events/s with a 5-minute freshness SLA.
Input — workload.
| Property | Value |
|---|---|
| ingest rate | 50 000 events/s |
| Flink checkpoint interval | 60 s |
| freshness SLA | 5 minutes |
| read SLA | 30 s p99 |
| readers | Spark + Trino |
Code.
-- Flink DDL for a MoR Hudi sink
CREATE TABLE orders_hudi (
order_id STRING,
user_id STRING,
amount DECIMAL(10, 2),
event_ts TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://lake/orders_hudi/',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'compaction.async.enabled' = 'false',
'compaction.schedule.enabled' = 'true',
'compaction.delta_commits' = '5',
'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained' = '48'
);
Step-by-step explanation.
- Flink writes one deltacommit per checkpoint (every 60 s). That's ~60 commits/hour, ~3 M events/commit (50 000 × 60).
- Every 5 deltacommits (5 minutes), Flink writes a
compaction.requestedinstant. It does not execute — the async compactor will. - A standalone Spark compactor service polls the timeline every minute, finds requested compactions, and executes them. Each compaction merges 5 log slices into a fresh base parquet per affected file group.
- The freshness SLA is met because the snapshot view always merges the latest logs in. Even the read-optimised view is no more than ~5 minutes stale (1 compaction interval).
- The cleaner retains 48 commits ≈ 48 minutes of headroom for any incremental consumer.
Output — service topology.
| Component | Cadence | Bytes / event written |
|---|---|---|
| Flink sink | continuous | tens of bytes (Avro append) |
| Spark compactor | every ~5 min | full file-group rewrite per touched fg |
| Spark cleaner | every ~10 min | nothing (just deletes obsolete) |
| Trino dashboard | on read | read-optimised view, parquet scan |
Rule of thumb. Streaming sink + freshness SLA in minutes = MoR + async compaction. Wire the compactor as a separate service from the start; you'll need it for production.
Worked example — multi-engine read matrix in practice
Detailed explanation.
- Goal. Predict how Spark, Flink, Presto, and Trino will see the same MoR table.
- Why this matters. A platform team often inherits a MoR table written by Flink and queried by Trino — they need to know whether Trino sees the same data as Spark.
- The mechanic. Each engine has a connector that supports a subset of views. The connector version matters.
Question. A MoR orders table has a base file at 09:00 and a deltacommit at 09:30. At 09:31, four engines query "show me the latest status for order_id=42." What do they see?
Input — table state at 09:31.
| fileId | base | log slices |
|---|---|---|
| f7a2 | 09:00 base | 1 log slice from 09:30 deltacommit (contains the update for order_id=42) |
Code.
-- All four engines run the same logical query
SELECT order_id, status, _hoodie_commit_time
FROM orders
WHERE order_id = '42';
Step-by-step explanation.
- Spark with
query.type=snapshot: reads the base + log slice, merges them, surfaces the 09:30 status. - Spark with
query.type=read_optimized: reads the base only, surfaces the pre-09:30 status (stale). - Flink streaming read: receives the deltacommit on its incremental stream, surfaces the 09:30 status with sub-second latency.
- Trino (Hudi connector ≥ 360): supports both snapshot and RO modes; default to RO. Switch to snapshot when freshness is required.
- Presto: same story as Trino; the snapshot connector is more recent than the RO connector, so older Presto deployments see only RO.
Output — observed status at 09:31.
| Engine | Mode | Sees update at 09:30? |
|---|---|---|
| Spark | snapshot | yes |
| Spark | read-optimised | no |
| Flink | streaming | yes |
| Trino ≥ 360 | snapshot | yes |
| Trino ≥ 360 | read-optimised | no |
| Presto ≥ 0.270 | snapshot | yes (if connector supports it) |
Rule of thumb. Pin the engine version and the view mode in your platform docs. The "two views" trade-off is the most common cause of "the Trino dashboard disagrees with the Spark batch job" incidents.
Interview question on the pick
A senior interviewer often closes the loop with: "You're starting a greenfield lakehouse. Pick the table type for (a) the curated orders fact table loaded hourly from a CDC stream, (b) the daily dimensional customers table, and (c) the IoT device_events raw landing table. Justify each pick in one sentence."
Solution Using the two-axis decision frame
(a) orders (CDC hourly, queried by Trino + Spark)
→ MERGE_ON_READ
→ write.operation=upsert, async compaction every 6 deltacommits
→ reason: CDC = high write rate, hourly queries can tolerate
a 6-commit log-merge, async compaction smooths writer latency
(b) customers (daily batch, queried by every dashboard)
→ COPY_ON_WRITE
→ write.operation=bulk_insert + nightly upsert
→ reason: 1 commit/day = cheap rewrite, dashboard queries
run at parquet speed, no compaction complexity
(c) device_events (very high ingest, queried by Spark only)
→ MERGE_ON_READ
→ write.operation=insert (no dedup), async compaction every 30 minutes
→ reason: very high write rate, schema is append-only so
"insert" beats "upsert" on cost, Spark snapshot view fine
Step-by-step trace.
| Table | Write rate | Read freshness SLA | Pick | Compaction strategy |
|---|---|---|---|---|
| orders | hourly CDC | hourly | MoR | async every 6 deltacommits |
| customers | daily batch | hours | CoW | (none) |
| device_events | continuous | minutes | MoR | async every 30 min |
Output:
| Table | Storage overhead | Read latency | Write latency |
|---|---|---|---|
| orders | ~1.2× (with logs) | base + ≤6 logs to merge | ~2 s per deltacommit |
| customers | ~1× | parquet-native | bounded by touched FG bytes |
| device_events | ~1.3× | base + ≤30-min logs | sub-second append |
Why this works — concept by concept:
- Two-axis frame is sufficient — read-latency SLA and write rate dominate; everything else is a knob.
- CDC = MoR is a default — high write rate + tolerable read lag is the textbook MoR sweet spot.
- Daily batch = CoW is a default — no log-merge complexity to operate, parquet-native reads.
-
Insert vs upsert is orthogonal to type — even MoR can use
insertwhen dedup is unnecessary, which lowers write CPU further. - Multi-engine constraint surfaces late — confirm Trino / Presto / Spark versions support the snapshot view before defaulting to MoR for tables read by all three.
- Cost — write per commit: O(file_group_size) for CoW, O(delta) for MoR. Compaction: O(base + log_slices) per MoR file group per cycle. Read: O(scanned_rows) for CoW and MoR RO; O(scanned_rows + log_slices) for MoR snapshot.
SQL
Topic — real-time analytics
Real-time analytics problems (SQL)
Cheat sheet — MoR vs CoW recipes
-
Default for daily batch tables.
COPY_ON_WRITEwithKEEP_LATEST_COMMITS=7cleaner. Simple, parquet-native reads, bounded write cost. -
Default for streaming sinks.
MERGE_ON_READwithwrite.operation=upsert,compaction.schedule.enabled=true,compaction.async.enabled=false, and a separate Spark compactor service. -
Default for very high ingest with no dedup.
MERGE_ON_READwithwrite.operation=insert— skips the index lookup, lowest write CPU. -
Pick
NUM_COMMITScompaction strategy by default. Switch toTIME_ELAPSEDwhen commit cadence is bursty and you want time-bounded log-merge cost. -
Set
hoodie.compact.inline.max.delta.commitsto the largest value that keeps your snapshot read latency tolerable. Smaller is fresher; larger is cheaper. - Use the read-optimised view for dashboards that tolerate one compaction interval of staleness. Use the snapshot view for anything that needs sub-minute freshness.
- Pin the engine version (Spark, Flink, Presto, Trino) in your platform docs. The Hudi connector capabilities vary across versions; the snapshot view is the most version-sensitive.
-
KEEP_LATEST_COMMITS=Ncleaner is the default. PickN≥ max(incremental_consumer_lag_SLA, time-travel_SLA) in commits. -
KEEP_LATEST_FILE_VERSIONS=Nonly when storage cost is dominant and you have no incremental consumers. - Compliance-grade deletes need both the delete commit and the next cleaner run. The retention window is the effective delete latency.
-
For CoW write amplification audits, the formula is
touched_file_groups × file_group_size_mbper commit. If the result divided by net rows added is more than ~50 MB/row, switch to MoR or lengthen the commit interval. -
For MoR read-cost audits, the formula is
base_scan_cost + sum(log_slice_sizes_to_merge). If the snapshot scan is twice as slow as the read-optimised scan, schedule compaction more frequently. -
Never delete files manually from
.hoodie/or partition directories. Usehudi-cli rollbackand let Hudi reconcile the timeline. - Migration from CoW to MoR (or back) is a fresh-table-and-cutover operation, not an in-place config change.
Frequently asked questions
What's the difference between Hudi CoW and MoR in one paragraph?
Copy-on-Write rewrites the affected file group's base parquet on every commit, so readers always scan a single up-to-date parquet per file group with zero merge logic — fast reads at the cost of high write amplification. Merge-on-Read appends each upsert as a row-based Avro log slice next to the base parquet and defers the parquet rewrite to a separate compaction action — fast writes at the cost of read-time merging in the snapshot view (or slight read staleness in the read-optimised view). CoW is the textbook pick for daily batch and read-heavy dashboards; MoR is the textbook pick for streaming sinks, CDC pipelines, and high-frequency upserts.
When should I choose Hudi MoR over CoW?
Pick hudi merge on read when the write rate is high relative to the file-group size — typically streaming sinks (Kafka, Debezium CDC, IoT telemetry) and any pipeline with sub-minute commit intervals. The decisive metric is write amplification: if CoW's touched_file_groups × file_group_size per commit is more than ~50 MB per net row added, MoR will pay back its complexity within weeks. The two prerequisites are a tolerable read-freshness SLA (the snapshot view's merge cost or the read-optimised view's compaction-interval staleness must fit your dashboards) and a multi-engine read story that supports both views in your target Spark / Flink / Presto / Trino versions.
How does Hudi compaction work?
hudi compaction reads each MoR file group's base parquet plus every log slice on top of it, applies the deltas in commit order (using the precombine field to break ties), writes a fresh base parquet, and emits a compaction COMPLETED instant on the timeline. The action can be triggered inline (synchronous to the writer, after every N deltacommits) or async (scheduled by the writer but executed by a separate Spark or HoodieCompactor service). The most common configuration is hoodie.compact.inline=false plus hoodie.compact.schedule.enabled=true plus a separate compactor service polling the timeline, which keeps writer latency consistent while bounding read-merge cost.
How does Hudi compare to Iceberg and Delta Lake?
hudi vs iceberg vs delta is the most-asked lakehouse interview question in 2026, and the honest answer is that the three formats now overlap heavily but differ on their write-path defaults. Hudi alone exposes the explicit MERGE_ON_READ / COPY_ON_WRITE split at table creation, which gives operators the cleanest lever for write amplification trade-offs. Iceberg defaults to copy-on-write for inserts and offers merge-on-read style positional and equality deletes for streaming sinks; its strength is multi-engine support and schema evolution. Delta Lake's MERGE INTO collapses inserts, updates, and deletes into one statement and ships with deletion vectors for efficient deletes; it is the strongest pick when Databricks is the primary compute.
How does Hudi support multiple query engines like Spark, Flink, Presto, and Trino?
Hudi ships a per-engine connector that translates the timeline + file-group + log-slice model into the engine's native scan operator. Spark is the reference implementation and supports CoW snapshot, MoR snapshot, and MoR read-optimised views. Flink is the only engine that also writes MoR with first-class streaming semantics, including a streaming read that emits new rows as deltacommits land. Presto and Trino (via the Hudi connector ≥ 360 in 2026) support both CoW snapshot and both MoR views, though the snapshot connector is more recent than the read-optimised one — pin your engine versions in your platform docs to avoid the "Trino sees stale data" class of incident.
Practice on PipeCode
- Drill the aggregation practice library → for the SUM / COUNT / GROUP BY muscle that every Hudi metric report rides on.
- Rehearse on data-aggregation problems → for the multi-key roll-ups CoW dashboards expect.
- Sharpen streaming drills → for the Kafka-style event semantics MoR sinks consume.
- Layer the ETL practice library → for the upsert / merge / dedup logic that drives Hudi write operations.
- Stack the real-time analytics drills → for the snapshot-view freshness questions MoR shops field every week.
- For the broader surface, read top data engineering interview questions →.
- Stack the prerequisites with the only 5 skills you need to become a data engineer →.
- Sharpen the engine axis with the Apache Spark internals course →.
- For pipeline design end-to-end, work through ETL system design for DE interviews →.
Pipecode.ai is Leetcode for Data Engineering — every Hudi recipe above ships with hands-on practice rooms where you write the CoW upsert, the MoR deltacommit, and the async compaction wiring against real graded inputs. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to guess whether your `hudi merge on read` configuration will hold under streaming load.





Top comments (0)