DEV Community

Cover image for Apache Hudi Merge-on-Read vs Copy-on-Write: Picking the Right Table Type
Gowtham Potureddi
Gowtham Potureddi

Posted on

Apache Hudi Merge-on-Read vs Copy-on-Write: Picking the Right Table Type

apache hudi sells itself as one table format, but in practice it is two table formats glued onto a shared transaction log. Every senior data engineer who has been paged at 03:00 by a lakehouse incident knows the truth: the table type you pick on day one decides whether your platform amortises write cost (Copy-on-Write, CoW) or amortises read cost (Merge-on-Read, MoR) for the next three years. Swapping it later is a migration project, not a config toggle.

This guide is the senior-level playbook for the hudi copy on write vs hudi merge on read decision. It walks through the Hudi timeline (the file-group anatomy with base parquet + delta log avro, the instants and their REQUESTED/INFLIGHT/COMPLETED states, the commit / deltacommit / compaction / clean / rollback / savepoint vocabulary), the CoW rewrite path that keeps reads fast, the MoR base-plus-logs path that keeps writes fast, the compaction and cleaner contracts that bound storage cost, and the multi-engine read matrix across Spark, Flink, Presto, and Trino. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.

PipeCode blog header for Apache Hudi MoR vs CoW — bold white headline 'Apache Hudi · MoR vs CoW' over a hero composition of a wide river of change-records with two crossing bridges (a thick concrete CoW bridge labelled 'rewrite' and a lighter MoR scaffolding bridge labelled 'append+merge') on a dark gradient.

When you want hands-on reps immediately after reading, drill the aggregation practice library →, rehearse on streaming problems →, and stack the lakehouse muscles with ETL drills →.


On this page


1. Hudi mental model — table types, timeline, file groups

Apache Hudi is a transactional lakehouse format built on three primitives — the timeline, the file group, and the table type

The one-sentence invariant: a Hudi table is a directory of partitioned file groups governed by an append-only timeline of instants, and the table type (COPY_ON_WRITE or MERGE_ON_READ) decides whether every write rewrites a file group's base parquet or appends a row-based delta log next to it. Once you internalise those three primitives, every Hudi interview question — concurrency, compaction sizing, query latency, schema evolution — becomes a property of one of them.

Iconographic timeline scroll — a horizontal timeline with instant-stamps (commit, deltacommit, compaction, clean, rollback) arranged left-to-right as coloured pins with glyphs, and below the timeline a row of file-group cylinders showing base + log files, with a side card listing instant states.

The three primitives in one paragraph.

  • The timeline. Every Hudi table has a .hoodie/ metadata directory that stores an ordered sequence of instants. An instant is a monotonically-timestamped action — commit, deltacommit, compaction, clean, rollback, savepoint, replacecommit, restore — captured as three files (<ts>.<action>.requested, <ts>.<action>.inflight, <ts>.<action>). Together they form the table's truth log.
  • The file group. Each partition is split into one or more file groups, identified by a UUID-style fileId. A file group is the durability unit: every upsert against a record key lands in exactly one file group inside a partition. The mapping is stable for the life of the file group, which is what gives Hudi its primary-key semantics on top of immutable object storage.
  • The table type. COPY_ON_WRITE stores each file group as a single base parquet that is rewritten on every commit. MERGE_ON_READ stores each file group as a base parquet plus a stack of row-based log files (Avro), and only periodically compacts them. The type is set at table creation and is not casually changeable.

The instant state machine.

  • REQUESTED. Driver has decided to attempt the action but no work has started. Cheap to roll back.
  • INFLIGHT. Executors are writing data files. A crash here leaves orphan files that the next rollback instant must clean up.
  • COMPLETED. The action's commit metadata (which file groups changed, which rows, schema, stats) is durably written. Readers can now observe the action's effect.

The action vocabulary every senior DE must know cold.

  • commit — written by CoW tables on every upsert. Each commit produces one new base file version per touched file group.
  • deltacommit — written by MoR tables on every upsert. Each deltacommit appends a new log file slice to each touched file group.
  • compaction — MoR-only. Merges accumulated log files into a fresh base parquet. Has its own REQUESTED → INFLIGHT → COMPLETED lifecycle.
  • clean — garbage-collects old file versions that no live reader needs, per the cleaner policy (KEEP_LATEST_COMMITS or KEEP_LATEST_FILE_VERSIONS).
  • rollback — undoes a failed instant by deleting orphan files. Triggered automatically by the next writer.
  • savepoint / restore — manual checkpoints. savepoint pins a particular instant from being cleaned; restore rolls the table back to a savepoint.
  • replacecommit — used by clustering and INSERT_OVERWRITE. Replaces a whole set of file groups with new ones.

The 2026 reality.

  • CoW is the default for batch analytics tables that are loaded daily or hourly and read heavily by dashboards.
  • MoR is the default for streaming sinks (Kafka → Hudi, CDC tables, IoT ingestion) where write rate is high and a few minutes of read lag is acceptable.
  • Both types coexist in the same data lake — the choice is per table, never per platform.

Worked example — read the timeline

Detailed explanation.

  • Goal. Reconstruct what happened to a Hudi table by listing the .hoodie/ directory and reading the instants in order.
  • Why this matters. When a downstream report drifts from expected numbers, the first diagnostic step is to read the timeline. The instant log shows you every commit, the files it touched, and whether any rollbacks intervened.
  • Where it lives. Every Hudi table root has a hidden .hoodie/ directory; the action files are named <timestamp>.<action>[.<state>] where <state> is requested, inflight, or absent (completed).

Question. A teammate hands you a Hudi table path and asks "what's the latest committed state?" Walk through the directory listing below and identify the latest committed instant, any inflight work, and any rollbacks.

Input.

filename meaning
20260613090000.commit commit, completed
20260613100000.commit.requested commit, requested
20260613100000.commit.inflight commit, inflight
20260613100000.commit commit, completed
20260613110000.commit.requested commit, requested
20260613110000.rollback rollback, completed
20260613120000.commit commit, completed

Code.

# List instants from .hoodie/, parse state, find the latest completed
import os, re

def list_instants(hoodie_dir: str) -> list[tuple[str, str, str]]:
    """Returns (timestamp, action, state) for each file."""
    rows = []
    for fn in os.listdir(hoodie_dir):
        m = re.match(r"(\d+)\.(\w+)(?:\.(\w+))?$", fn)
        if not m:
            continue
        ts, action, state = m.group(1), m.group(2), m.group(3) or "completed"
        rows.append((ts, action, state))
    return sorted(rows)

def latest_completed_commit(rows):
    return max(
        (r for r in rows if r[1] in ("commit", "deltacommit", "replacecommit")
         and r[2] == "completed"),
        key=lambda r: r[0],
        default=None,
    )
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The first instant 20260613090000.commit is completed — the table's earliest visible state.
  2. The 100000 instant has all three files (requested, inflight, completed). The completed file means it landed cleanly.
  3. The 110000 instant has only requested — no inflight, no completed. It was abandoned mid-flight. The next writer noticed and emitted 20260613110000.rollback to clean up.
  4. The 120000 instant is the latest committed action. Readers should snapshot the table at this instant.
  5. The latest committed commit is 20260613120000. The rollback in between is metadata-only — it does not produce a new readable version, only undoes the failed 110000.

Output.

Property Value
latest completed commit 20260613120000
failed / rolled-back instants 20260613110000
inflight work (none)
commits since 0900 3 (0900, 1000, 1200)

Rule of thumb. Read the timeline before you read the data. The instant log answers "what happened?" cheaper than running a row count.

Worked example — file group anatomy

Detailed explanation.

  • Goal. Visualise what a single file group looks like on disk for each table type.
  • Why this matters. Most Hudi performance bugs are file-group sizing bugs (too few rows per file group, too many log slices before compaction). Knowing the anatomy makes the bug obvious.
  • The layout. Inside partition=<value>/ directories, the file group is identified by a UUID; each base parquet and each log slice carries the fileId in its name plus a commit timestamp.

Question. Given the directory listing below, identify which files belong to the same file group, distinguish base files from log files, and recognise whether the table is CoW or MoR.

Input.

filename path
f7a2-0_0-100-200_20260613090000.parquet partition=us/
f7a2-0_0-110-210_20260613100000.parquet partition=us/
.f7a2-0_20260613100000.log.1_0-120-220 partition=us/
.f7a2-0_20260613100000.log.2_0-130-230 partition=us/
9b1c-0_0-101-201_20260613090000.parquet partition=us/

Code.

# Group filenames by fileId, classify each as base or log slice
import re

def parse_hudi_file(fn: str) -> dict:
    if fn.startswith("."):
        # log slice: .<fileId>_<baseCommit>.log.<version>_<writeToken>
        m = re.match(r"\.([a-f0-9-]+)_(\d+)\.log\.(\d+)", fn)
        return {"kind": "log", "fileId": m.group(1),
                "baseCommit": m.group(2), "version": int(m.group(3))}
    # base file: <fileId>_<writeToken>_<commit>.parquet
    m = re.match(r"([a-f0-9-]+)_[\w-]+_(\d+)\.parquet$", fn)
    return {"kind": "base", "fileId": m.group(1), "commit": m.group(2)}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The first two parquet files share the fileId f7a2. They are two versions of the same file group's base file (commit 0900 then commit 1000). CoW would emit one new base per commit on this file group.
  2. The two .log files share the same fileId f7a2 and have baseCommit=1000 in their names. They are delta log slices written after the 1000 commit landed. Their presence means this is a MoR table.
  3. The 9b1c parquet is a separate file group in the same partition. Each partition can host many file groups; record keys are routed to one and only one fileId.
  4. The leading dot on log files (.f7a2...) is the Hudi convention to mark them as auxiliary so that non-Hudi-aware listers skip them.

Output.

fileId base files log slices inferred table type
f7a2 v1 @ 0900, v2 @ 1000 2 slices over base 1000 MoR (because log slices exist)
9b1c v1 @ 0900 none indeterminate (could be CoW or quiet MoR)

Rule of thumb. The presence of .log files in a partition is the on-disk signature of a MoR table. The absence of log files does not prove CoW — it could be a MoR table that has just been compacted.

Worked example — instants and the REQUESTED → INFLIGHT → COMPLETED states

Detailed explanation.

  • Goal. Trace a single commit's lifecycle through the three states.
  • Why this matters. Most "the writer died and now nothing works" incidents are an inflight instant blocking the next attempt. Knowing the state machine tells you what to delete (or what to leave alone).
  • The lifecycle. Every action — including compaction and clean — passes through REQUESTED, then INFLIGHT, then COMPLETED. A crash between any two states leaves an artefact for the next writer to inspect.

Question. A writer crashed mid-deltacommit. Which files exist in .hoodie/, what should the next writer do, and what should the next reader do?

Input.

filename in .hoodie/ meaning
20260613100000.deltacommit.requested requested file
20260613100000.deltacommit.inflight inflight file
(no completed file yet) the commit never finished

Code.

# Pseudo-code from the next writer's startup probe
def reconcile(timeline):
    for instant in timeline.pending_instants():
        if instant.state == "INFLIGHT":
            # Writer should attempt rollback
            timeline.schedule_rollback(instant)
        elif instant.state == "REQUESTED" and instant.is_orphan():
            timeline.schedule_rollback(instant)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The next writer opens the table and lists .hoodie/. It sees an inflight deltacommit at timestamp 1000 with no completed file.
  2. Hudi's invariant: any inflight instant whose writer is no longer alive must be rolled back before a new instant can commit. The writer schedules a rollback instant.
  3. The rollback action deletes any orphan log files written under the failed deltacommit (using the commit metadata embedded in their names).
  4. The next reader, querying the table during this window, sees only completed instants. The failed inflight is invisible to readers — they never observe a partial state.

Output.

Actor Sees Action
Next writer inflight 1000 emit rollback
Next reader latest completed commit (≤ 0900) read as if 1000 never existed
Operator orphan log files wait for rollback or run hudi-cli rollback

Rule of thumb. Never manually delete files in .hoodie/ or in partition directories — Hudi's rollback can do it safely. Hand-deletion will desync the timeline from the data and is the canonical way to corrupt a lakehouse.

Interview question on the Hudi mental model

A senior interviewer often opens with: "Explain the difference between a commit and a deltacommit in Hudi, and walk me through what a single upsert produces on disk for a CoW table vs a MoR table." It blends timeline, file group, and table type into one probe.

Solution Using the three-primitive frame

# Pseudo-code — one upsert against record-key=42, value="X"
def upsert(table, key, value):
    fileId = table.index.lookup_or_assign(key)        # file group routing
    if table.type == "COPY_ON_WRITE":
        base = table.partition.read_base(fileId)
        merged = base.upsert(key, value)
        new_base = base.next_version()
        write_parquet(new_base, merged)
        table.timeline.emit_commit({fileId: new_base})  # commit instant
    else:  # MERGE_ON_READ
        slice_n = table.partition.next_log_slice(fileId)
        append_avro(slice_n, [(key, value, "upsert")])
        table.timeline.emit_deltacommit({fileId: slice_n})  # deltacommit instant
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step CoW path MoR path
1 Look up record-key=42 in index → fileId=f7a2 same
2 Read base file f7a2_..._<prev>.parquet (no read — append-only)
3 Apply upsert to in-memory rows encode (key=42, value=X, op=upsert) as Avro
4 Write new base parquet f7a2_..._<now>.parquet Append to log slice .f7a2_<baseCommit>.log.<N+1>
5 Emit commit instant in .hoodie/ Emit deltacommit instant in .hoodie/
6 Old base remains until cleaner GCs it Base remains untouched; compaction will merge later

Output:

Aspect CoW MoR
Instant action commit deltacommit
Files produced 1 new base parquet 1 new log slice (avro)
Read latency low (read one parquet) higher (read base + merge logs)
Write amplification high (rewrite whole file group) low (append only)
Cleanup actor cleaner compaction then cleaner

Why this works — concept by concept:

  • File group routing — every record key is hashed to exactly one fileId in its partition. Both table types use the same routing — the difference is what happens after the route is decided.
  • Commit vs deltacommit — the two action names are a deliberate timeline distinction so that downstream tooling can tell at a glance whether log files were touched.
  • Snapshot vs read-optimised views — CoW exposes only a snapshot view (the latest base). MoR exposes both a snapshot view (base + logs merged) and a read-optimised view (base only, stale by compaction interval).
  • Write amplification trade-off — CoW pays full O(rows-in-file-group) write cost per upsert; MoR pays O(updated-rows) for the append, plus the deferred O(rows-in-file-group) at compaction time. Same total work, redistributed in time.
  • Cost — CoW write: O(file_group_size) per commit. MoR write: O(delta) per deltacommit + O(file_group_size) per scheduled compaction. CoW read: O(scanned_rows). MoR snapshot read: O(scanned_rows) + O(log_slices × delta_rows).

SQL
Topic — aggregation
Aggregation problems (SQL)

Practice →


2. Copy-on-Write — full rewrites, read-heavy workloads

hudi copy on write rewrites the touched file group's base parquet on every commit so that readers never merge — the price is write amplification

The mental model in one line: every CoW write reads the affected base file, merges the incoming records, and writes a brand-new base file version; readers always see a single parquet per file group and never run a merge at query time. Once you say "rewrite the file group on write, scan one parquet on read," the entire CoW operational story falls out.

Iconographic before/after parquet card — a left-hand existing base file glyph, a centre glowing rewrite arrow with a 'merge updates' badge, and a right-hand fresh base file glyph; side card lists CoW rules as read-fast, write-expensive, simple compaction.

CoW rules in five bullets.

  • One file format on disk. Base parquet, full stop. No log files, no Avro, no row-format secondary store.
  • Every commit produces a new base file version per touched file group. Old versions remain visible to in-flight readers until the cleaner removes them.
  • Readers always do a snapshot read. No merge logic at read time. Vectorised parquet scan, columnar pruning, predicate push-down all behave like a vanilla parquet table.
  • No compaction action. There are no log files to merge into a base, so the compaction instant simply does not exist for CoW tables.
  • Cleaner is the only background job. It GCs old base versions according to the cleaner policy. Without it, your storage cost grows linearly with commit count.

When CoW is the right pick.

  • Daily or hourly batch loads. Each commit touches few file groups but every read is hot. Write amplification is bounded by load frequency.
  • Dashboard / BI tables. Sub-second scan latency matters more than write latency.
  • Tables that downstream Presto / Trino / Athena query directly. Read-optimised views are first-class without any merge logic.
  • Strict GDPR / right-to-be-forgotten deletes. A delete that rewrites the base file genuinely overwrites the rows — easier to certify than MoR, which may carry a delete record in a log slice for days.

Where CoW hurts.

  • High-frequency upserts. Rewriting an entire file group on every micro-batch is wasteful when the average delta is a handful of rows.
  • Streaming sinks. A Kafka → CoW Hudi sink committing every 30 seconds will spend most of its CPU rewriting parquet, not landing data.
  • Big file groups. The bigger the average file group (in MB), the more rewrites cost per byte of new data. The ratio (delta-bytes / file-group-bytes) is the write amplification factor.

Common interview probes on CoW.

  • "What happens to the old base file when you commit a CoW update?" — it remains on disk for in-flight readers; the cleaner removes it later.
  • "Can two writers commit to a CoW table at the same time?" — only with OCC (optimistic concurrency control) configured, and they must touch disjoint file groups; otherwise one aborts.
  • "How does delete behave in CoW?" — it triggers a rewrite of every file group that holds a target record, producing a new base file with the deleted rows physically absent.
  • "What is the cost of a CoW upsert that updates one row?" — O(file_group_size) in bytes, because the whole base file is rewritten.

Worked example — upsert rewrites the file group

Detailed explanation.

  • Goal. Show what happens on disk when a single update lands in a CoW table.
  • Why this matters. Engineers regularly under-estimate CoW write cost because the logical update is one row. The physical cost is the size of the base file.
  • The mechanic. Hudi reads the affected base parquet, applies the upsert in-memory, and writes a new parquet file with a fresh commit timestamp in its name.

Question. A CoW table has a single file group with a 256 MB base parquet. You upsert one row. What does the disk look like before vs after, and what is the write cost?

Input — file group state before.

fileId latest base bytes
f7a2 f7a2_..._20260613090000.parquet 256 MB

Code.

// Spark / Hudi CoW upsert — single row example
val df = Seq(("k1", "new_value", 200L))
  .toDF("record_key", "value", "updated_at")

df.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
  .option("hoodie.datasource.write.operation", "upsert")
  .option("hoodie.datasource.write.recordkey.field", "record_key")
  .option("hoodie.datasource.write.precombine.field", "updated_at")
  .mode("append")
  .save("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The driver looks up record_key=k1 in the index and finds it routes to fileId f7a2.
  2. The executor reads the entire 256 MB base file into memory (or streams it row-group by row-group).
  3. It applies the upsert: replace the existing row for k1 (or insert if absent).
  4. It writes a new parquet file f7a2_..._20260613100000.parquet containing all rows — the unchanged ones plus the upserted one. The file is roughly 256 MB plus or minus the delta.
  5. Hudi emits a commit instant referencing the new base file. The old file ..._090000.parquet is still readable until the cleaner removes it.

Output — disk state after.

fileId base files now
f7a2 ..._090000.parquet (256 MB), ..._100000.parquet (~256 MB)
Metric Value
Bytes read 256 MB
Bytes written 256 MB
Storage (until clean) 512 MB
Write amplification 256 MB per row upserted

Rule of thumb. For a CoW table, the minimum cost of any commit is the size of the smallest touched file group. Plan your commit cadence and your batch sizes so that write amplification stays acceptable — a useful rule is "commit when your delta is at least 5% of the file group size."

Worked example — read path is one parquet per file group

Detailed explanation.

  • Goal. Show that a CoW read is identical in shape to reading a plain parquet table.
  • Why this matters. Most query latency wins from migrating off CoW are illusory — CoW reads are already as fast as parquet reads. The real read wins come from MoR's read-optimised view, not from CoW.
  • The mechanic. Hudi resolves the latest commit instant, lists the latest base file per file group, and hands the file list to the scan operator.

Question. Given a CoW table with three file groups, what files does a SELECT * read at commit timestamp 1000?

Input — file groups at commit 1000.

fileId latest base file
f7a2 f7a2_..._20260613100000.parquet
9b1c 9b1c_..._20260613090000.parquet
a3d4 a3d4_..._20260613100000.parquet

Code.

-- A vanilla scan via Spark SQL or Trino
SELECT *
FROM hudi.orders
WHERE event_date >= DATE '2026-06-01';
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The query engine asks Hudi for the file list at the latest completed commit (= 1000).
  2. Hudi returns exactly one base parquet per file group — the most recent version ≤ commit 1000.
  3. The scan operator reads only these three files, applies partition pruning and parquet predicate push-down, and returns the matching rows.
  4. There is no merge step; there is no log replay; there is no per-row deduplication. The read is identical in cost to reading three plain parquet files.

Output.

File scanned Rows scanned Why
f7a2_..._100000.parquet all rows in f7a2 latest version at commit 1000
9b1c_..._090000.parquet all rows in 9b1c 9b1c had no commit at 1000
a3d4_..._100000.parquet all rows in a3d4 latest version at commit 1000

Rule of thumb. A CoW read is "list latest base per file group, scan." If Presto / Trino / Athena performance looks slower than expected, the problem is almost always file-group sizing or partition layout — not Hudi.

Worked example — delete propagation in CoW

Detailed explanation.

  • Goal. Show how a delete physically removes the row.
  • Why this matters. Compliance teams need certainty that a "delete user 42" call results in user 42's row physically absent from disk, not merely logically suppressed. CoW satisfies this contract more directly than MoR.
  • The mechanic. A delete in CoW behaves like an upsert with a tombstone: the affected file group is rewritten without the targeted rows.

Question. A CoW orders table has a user_id=42 row in the file group f7a2_..._090000.parquet (256 MB total). The team issues a delete-by-key. Show the disk state before and after.

Input.

Property Value
operation delete
record_key user_id=42
target fileId f7a2
size before 256 MB

Code.

val deletes = Seq("42").toDF("user_id")

deletes.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
  .option("hoodie.datasource.write.operation", "delete")
  .option("hoodie.datasource.write.recordkey.field", "user_id")
  .mode("append")
  .save("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Driver routes user_id=42 to fileId f7a2.
  2. Executor reads the base file, drops the row matching user_id=42, and writes a new base file ..._110000.parquet without it.
  3. Commit metadata records the operation as delete. Hudi's incremental query API can later surface the removed row to subscribers, but the data itself is physically gone in the new base.
  4. The old base file ..._090000.parquet still contains user_id=42 until the cleaner removes that version, at which point the data is irretrievable.

Output.

Stage Disk holds row? Reader sees row?
Before delete yes (in ..._090000.parquet) yes
After delete commit yes (still in ..._090000.parquet) no (reader uses ..._110000.parquet)
After cleaner no no

Rule of thumb. Compliance-grade deletes require both the commit and the next cleaner run. The "retention window" of your cleaner policy is the effective delete latency from a GDPR / right-to-be-forgotten standpoint.

Interview question on CoW operations

A senior interviewer might frame this as: "You inherit a CoW orders table that commits hourly. The last hour added 200 net new rows. The base files are 256 MB each, and the table has 1000 file groups. Estimate the hourly write cost and how you'd reduce it." It blends file-group sizing, commit cadence, and write amplification.

Solution Using a write-amplification computation

# Estimate hourly bytes written for a CoW table
file_groups_total = 1000
file_groups_touched = 50          # observed from commit metadata
avg_base_file_mb = 256
delta_rows = 200
hours_per_day = 24

bytes_per_commit_mb = file_groups_touched * avg_base_file_mb
bytes_per_day_gb = bytes_per_commit_mb * hours_per_day / 1024
write_amp_per_row = bytes_per_commit_mb * 1024 * 1024 / delta_rows
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step Calculation Value
1 bytes per commit 50 * 256 = 12 800 MB ≈ 12.5 GB
2 bytes per day 12.5 GB * 24 = 300 GB / day
3 rows per day 200 * 24 = 4 800 rows
4 write amplification per row 12.5 GB / 200 ≈ 64 MB / row
5 mitigation: increase commit interval to 4 hours bytes/day = 12.5 * 6 = 75 GB; rows/commit = 800
6 mitigation: switch to MoR bytes/commit ≈ delta size; compaction every 24 commits

The trace shows that committing every hour for 200 rows is rewriting 12.5 GB of parquet — a write amplification of 64 MB per row. Either coalesce commits into a longer window or switch to MoR.

Output:

Strategy Bytes / day Read latency change Notes
Hourly CoW 300 GB none wasteful
4-hour CoW 75 GB none simplest fix
MoR + hourly deltacommit ~2 GB +30% read needs compaction policy
MoR + 6-hour compaction ~2 GB writes + 12.5 GB / compaction varies balanced

Why this works — concept by concept:

  • Write amplification = file-group-size / delta-size — the fundamental CoW knob. Bigger file groups amplify more; bigger deltas amplify less.
  • Commit cadence is a cost lever — every doubled interval roughly halves write cost (assuming delta accumulates linearly). The trade-off is data freshness.
  • Touched file groups are a cardinality signal — if 50 of 1000 file groups are touched, your record keys cluster well. If 950 of 1000 are touched, your keys are uniform and CoW is paying its worst case.
  • MoR is the structural fix — it changes the cost model from O(file_group_size) per commit to O(delta) per commit, paid back at compaction time.
  • Cost — CoW write per commit: O(touched_file_groups × file_group_size). CoW read per scan: O(scanned_rows) with vectorised parquet — identical to a vanilla parquet table.

SQL
Topic — data aggregation
Data aggregation problems (SQL)

Practice →


3. Merge-on-Read — base + delta logs, write-heavy workloads

hudi merge on read writes changes to row-based log files and defers the parquet rewrite to compaction — fast writes, costlier reads

The mental model in one line: every MoR write appends a row-based Avro log slice to the affected file group; readers either scan just the base (fast but stale) or scan the base plus replay every log slice on top (fresh but slower); compaction periodically folds the logs back into a new base file. Once you say "append on write, merge at read, compact in the background," the MoR operational story falls out.

Iconographic layered read diagram — a base parquet brick on the bottom plus a stack of delta log file slices on top, a glowing 'merge at read' arrow combining them at query time, and a compaction-scheduler gear glyph beside; side card lists MoR rules as write-fast, read-slower, async compaction.

MoR rules in five bullets.

  • Two file formats on disk. Base parquet (columnar) plus delta log files (Avro, row-based). Both carry the same schema.
  • Every upsert is a deltacommit. Hudi appends a new log slice referencing the current base file. The base file is not rewritten.
  • Two read views. The read-optimised view reads only the base files (very fast, but stale by up to one compaction interval). The snapshot view reads the base plus merges every log slice on top (fresh, but slower).
  • Compaction is a separate background job. It reads the base + all logs, applies them in order, and writes a new base parquet, then declares the logs obsolete.
  • Cleaner still runs. After compaction, the old base and the merged logs are eligible for GC under the cleaner policy.

When MoR is the right pick.

  • High-frequency upserts / CDC. A Debezium → Kafka → Hudi sink benefits massively from append-only writes.
  • Streaming sinks with relaxed read freshness. Dashboards that tolerate 5–30 minutes of staleness can serve from the read-optimised view at parquet speeds.
  • Tables with skewed update patterns. If most updates target a small fraction of records, MoR keeps log files small and compaction cheap.
  • Tables that need both real-time consumers and batch consumers. Real-time readers take the snapshot view; batch readers take the read-optimised view. Same data, two cost profiles.

Where MoR hurts.

  • Read-heavy dashboards that need fresh data. Every snapshot read pays the log-merge cost.
  • Tables with very few writes but many reads. The base file is fresh enough that MoR's machinery is unnecessary overhead.
  • Engines that only support read-optimised view. Some older Presto / Trino versions can only read the base, hiding recent deltacommits entirely from analytics.

Common interview probes on MoR.

  • "What is the difference between the read-optimised and snapshot views?" — read-optimised scans the base only; snapshot replays the logs on top.
  • "How are log files encoded?" — Avro row-based, organised into log blocks with header metadata (commit time, schema, block type).
  • "What is a rollback block?" — a special log-block type emitted when an inflight deltacommit is rolled back; it instructs the reader to ignore prior blocks from that commit.
  • "What governs compaction timing?" — the compaction policy: NUM_COMMITS (compact every N deltacommits), TIME_ELAPSED (after T minutes), NUM_COMMITS_AFTER_LAST_REQUEST (since last compaction was scheduled), or a custom strategy.

Worked example — write fast (append to log)

Detailed explanation.

  • Goal. Show what happens on disk when an upsert lands in a MoR table.
  • Why this matters. Engineers picking MoR want to know the per-write cost is bounded by the delta size, not the base file size — that's the whole point.
  • The mechanic. The executor encodes the upsert records as Avro and appends them to a log file that targets the current base file via its baseCommit token.

Question. A MoR table has one file group with a 256 MB base parquet at commit 0900. You deltacommit 200 upserts at 1000. What lands on disk?

Input — state before.

fileId base logs
f7a2 f7a2_..._090000.parquet (256 MB) (none)

Code.

val df = (1 to 200).toDF("record_key").withColumn("value", lit("X"))
df.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
  .option("hoodie.datasource.write.operation", "upsert")
  .option("hoodie.datasource.write.recordkey.field", "record_key")
  .option("hoodie.datasource.write.precombine.field", "updated_at")
  .option("hoodie.compact.inline", "false")           // async compaction
  .mode("append")
  .save("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The 200 upserts hash to fileId f7a2.
  2. The executor encodes them as one Avro log block with a header (commit time = 1000, baseCommit = 0900, schema hash, block type = DATA).
  3. It appends the block to .f7a2_20260613090000.log.1_... — a new log slice file targeting base commit 0900.
  4. Hudi emits a deltacommit instant. The base file is untouched.
  5. A subsequent upsert at commit 1100 would append a new log block, either to a new log slice file (...log.2) or to the same file depending on configuration.

Output — state after.

fileId base logs
f7a2 f7a2_..._090000.parquet (256 MB) .f7a2_...090000.log.1 (~few KB)
Metric Value
Bytes written ~few KB (just the delta)
Bytes read 0
Write amplification ≈ 1× (no rewrite)

Rule of thumb. MoR collapses write amplification from O(file_group_size) to O(delta_size). The cost is moved to compaction, where it is paid once per N deltacommits.

Worked example — read paths: read-optimised vs snapshot

Detailed explanation.

  • Goal. Show the two read views and the cost difference.
  • Why this matters. Operators frequently configure the wrong view for a workload — dashboards that demand freshness get pointed at the read-optimised view (stale) and batch backfills that don't need freshness pay the snapshot cost.
  • The mechanic. The query specifies the view via a config (hoodie.datasource.query.type=read_optimized or snapshot). Hudi assembles the file list accordingly.

Question. A MoR file group has a 256 MB base at commit 0900 and three log slices over it (at commits 1000, 1100, 1200). Compare the file lists and CPU profiles for read-optimised vs snapshot scans.

Input — file group state.

Component Source
base f7a2_..._090000.parquet (256 MB)
log slice 1 .f7a2_...090000.log.1 (5 MB Avro, 1000 deltas)
log slice 2 .f7a2_...090000.log.2 (3 MB Avro, 600 deltas)
log slice 3 .f7a2_...090000.log.3 (4 MB Avro, 800 deltas)

Code.

// read-optimised view (fastest, stale by up to one compaction interval)
val ro = spark.read.format("hudi")
  .option("hoodie.datasource.query.type", "read_optimized")
  .load("s3://lake/orders/")

// snapshot view (fresh, more CPU)
val snap = spark.read.format("hudi")
  .option("hoodie.datasource.query.type", "snapshot")
  .load("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The read-optimised scan lists only the latest base file per file group. It ignores log slices entirely. Cost = scan one 256 MB parquet.
  2. The snapshot scan lists the base plus every log slice that targets it. It loads the base, then replays the logs in commit order, applying each delta to the in-memory dataset (upserts and deletes both).
  3. The deduplication merge is keyed on the record key; the precombine field decides which version wins when a key appears multiple times.
  4. After the merge, the snapshot scan emits the consolidated rows to the downstream operator. The cost = base scan + Avro decode of all logs + merge.

Output — cost comparison.

View Files read CPU Freshness
read-optimised 1 (base 256 MB) low (vectorised parquet) stale to 0900
snapshot 1 + 3 (base + 12 MB logs) high (Avro decode + merge) fresh to 1200

Rule of thumb. Default dashboards to read-optimised. Switch to snapshot only when "freshness within minutes" is in the SLA, and budget the extra CPU.

Worked example — log block headers and rollback blocks

Detailed explanation.

  • Goal. Show that log files are not free-form Avro — they have a block-header schema that allows mid-file rollbacks.
  • Why this matters. A failed deltacommit must be erased from the reader's point of view without rewriting the log file. The rollback block accomplishes this.
  • The mechanic. A log block has a 6-byte magic header, version, type, length, content, and footer. The block type can be DATA, DELETE, COMMAND, or CORRUPT.

Question. A deltacommit at 1100 partially wrote a log block and then crashed. How does the rollback at 1110 invalidate the partial block without rewriting the file?

Input — log file body.

Block Type Commit Status
1 DATA 1000 OK
2 DATA 1100 partially written (CORRUPT magic at end)
3 COMMAND 1110 rollback target=1100

Code.

# Pseudo-code — Hudi's LogFileReader scanning the log
def replay_blocks(file):
    rolled_back = set()
    for block in file.read_blocks():
        if block.type == "COMMAND" and block.subtype == "ROLLBACK":
            rolled_back.add(block.target_commit)
        elif block.type == "DATA":
            if block.commit_time in rolled_back:
                continue                            # silently skip
            yield block.records
        elif block.type == "CORRUPT":
            continue                                # malformed tail
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The reader iterates blocks left-to-right. It first encounters the DATA block from commit 1000 and surfaces its records.
  2. It encounters the partially-written block from commit 1100. The block magic is incomplete, so the reader marks the block as CORRUPT and skips it.
  3. It encounters the COMMAND/ROLLBACK block from commit 1110. The block instructs the reader to also skip any DATA blocks belonging to commit 1100 — protection in case the partial block had managed to write its magic.
  4. The reader produces only the records from commit 1000. The rollback semantics are preserved without modifying the log file.

Output — reader semantics.

Commit Block type Reader treats as
1000 DATA live
1100 DATA (corrupt) dropped
1110 COMMAND/ROLLBACK metadata only

Rule of thumb. Hudi's log format makes rollbacks O(1) — append a COMMAND block. This is why MoR can support concurrent writers with optimistic concurrency control without expensive rewriting.

Interview question on MoR architecture

A senior interviewer might frame this as: "Walk me through a single CDC event lifecycle in a MoR table from ingest to query. Include the deltacommit, the read-optimised view, the snapshot view, and the eventual compaction."

Solution Using a CDC lifecycle trace

T+0       CDC event: UPDATE orders SET status='paid' WHERE id=42
T+0.1s    Flink sink hashes id=42 → fileId=f7a2
T+0.2s    Encodes (id=42, status='paid', op='U', ts=T) as Avro
T+0.3s    Appends to .f7a2_<baseCommit>.log.<N>
T+0.4s    Emits deltacommit instant 20260613T1234567
T+1m      Read-optimised dashboard scan: misses the update
T+1m      Snapshot dashboard scan: surfaces the update
T+30m     Compaction job triggered (NUM_COMMITS=30 reached)
T+31m     Compaction reads base + all log slices, writes new base
T+31m     Emits commit (compaction) instant; logs marked obsolete
T+32m     Cleaner GCs old base file and merged log slices
T+32m     Read-optimised and snapshot now both return id=42 paid
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Time Actor Disk state on fileId f7a2
T+0 (before) base v090000.parquet only
T+0.4s sink base v090000.parquet + log slice N (with update)
T+31m compactor base v0900.parquet + base v1234.parquet + obsolete logs
T+32m cleaner base v1234.parquet only

The trace shows how a single update is "live in the snapshot view" instantly, "live in the read-optimised view" only after compaction, and "physically realised in a fresh base" only after compaction completes.

Output:

View at T+5m Sees update? CPU
read-optimised no low
snapshot yes high (log merge)
snapshot via Flink incremental yes (sub-second) engine-managed
read-optimised after T+31m yes low

Why this works — concept by concept:

  • Deltacommit decouples ingest from rewrite — the writer's job ends at "append the log slice and emit the instant." The compactor is responsible for the eventual parquet rewrite.
  • Read-optimised vs snapshot is a freshness lever — same table, two query types, two cost profiles. The platform team picks per consumer.
  • Compaction triggers — typically NUM_COMMITS (compact every N deltacommits) or TIME_ELAPSED. A custom strategy can also consider log size or skew.
  • Multiple writers via OCC — Hudi's optimistic concurrency control can serialise concurrent writers on the timeline. Conflicts are detected at file-group granularity.
  • Cost — write: O(delta_size) per deltacommit. Compaction: O(base_size + sum(log_sizes)) per compaction. Read-optimised: O(scanned_rows). Snapshot: O(scanned_rows + log_sizes_to_merge).

SQL
Topic — streaming
Streaming problems (SQL)

Practice →


4. Compaction and cleaning — inline vs async scheduler

hudi compaction collapses log slices into a new base file; the cleaner GCs obsolete versions — together they bound storage cost and read latency

The mental model in one line: compaction folds delta log slices back into the base parquet to keep MoR read latency bounded; the cleaner GCs file versions older than the retention window to keep storage cost bounded. The two are separate actions on the timeline — different triggers, different policies, different operational levers.

Compaction in detail.

  • Action name. compaction. Like commit and deltacommit, it has its own REQUESTED → INFLIGHT → COMPLETED state machine.
  • What it does. For each file group with at least one log slice, the compactor reads base_v<n>.parquet + all log slices on top, applies the deltas in commit order, and writes base_v<n+1>.parquet. The logs are then declared obsolete.
  • Trigger modes. Inline (after every Nth deltacommit, blocks the writer until compaction completes) or async (a separate process runs continuously, the writer is unblocked).
  • Strategies. NUM_COMMITS (the default — compact every N deltacommits per file group), TIME_ELAPSED (after T minutes since last compaction), NUM_COMMITS_AFTER_LAST_REQUEST (since the last scheduled compaction, useful when async).
  • Concurrency. Compaction can run alongside writers because it operates on the current base + already-committed log slices. New deltacommits land in new log slices that the compactor will pick up next round.

Cleaning in detail.

  • Action name. clean. Same state machine as the other actions.
  • What it does. Deletes obsolete file versions per the cleaner policy. The policy decides "what is obsolete?"
  • Policies.
    • KEEP_LATEST_COMMITS — keep file versions referenced by the latest N commits. This is the recommended policy for incremental query consumers (their last-read timestamp must fall in the retention window).
    • KEEP_LATEST_FILE_VERSIONS — keep the latest N file versions per file group, regardless of commit timestamp. Simpler but does not align with incremental query lag.
    • KEEP_LATEST_BY_HOURS — keep all file versions written in the last T hours.
  • Tension. A long retention window costs storage but supports long-running readers, incremental subscribers, and time-travel queries.

Inline vs async — the operational lever.

  • Inline compaction. Simpler to operate. The writer schedules a deltacommit, hits the threshold, runs compaction synchronously, then returns. Latency of any given write becomes bimodal (cheap when not triggering compaction, expensive when triggering).
  • Async compaction. Better for steady-state ingest. The writer never blocks; a separate process (a Spark structured streaming job or a Hudi services container) polls the timeline and runs compaction when conditions are met. Operational cost: one more process to monitor.
  • Hybrid. "Schedule inline, execute async" — the writer marks compaction as REQUESTED but does not run it; the async process picks up the requested instant and runs it. Common pattern for Flink sinks.

Common interview probes on compaction.

  • "Why must compaction never run concurrently with itself on the same file group?" — because the second compactor would read a stale base and produce a wrong merge. Hudi enforces single-compaction-per-file-group via the timeline.
  • "What is a scheduled compaction vs an executed compaction?" — the REQUESTED instant is the schedule decision; the COMPLETED instant is execution. The split lets async services do the work.
  • "How do you size a compaction interval?" — pick the largest NUM_COMMITS that keeps snapshot read latency tolerable. Smaller N keeps reads fresh but costs more compaction CPU.
  • "What's the relationship between the cleaner's KEEP_LATEST_COMMITS and an incremental query consumer?" — the cleaner must retain enough commits that every consumer's "last seen" timestamp is still in the retention window; otherwise the consumer cannot resume.

Worked example — inline compaction every N deltacommits

Detailed explanation.

  • Goal. Configure inline compaction to run every 10 deltacommits.
  • Why this matters. It is the simplest MoR setup to operate — no separate services to monitor.
  • The mechanic. Set hoodie.compact.inline=true and hoodie.compact.inline.max.delta.commits=10. The 10th deltacommit invokes the compactor synchronously and only returns when it completes.

Question. A MoR table is configured with inline compaction every 10 deltacommits. What happens at deltacommit 10, and what is the latency profile of the 10th vs 11th commit?

Input — config.

Key Value
hoodie.compact.inline true
hoodie.compact.inline.max.delta.commits 10
baseline deltacommit latency ~2 s
baseline compaction duration ~60 s

Code.

df.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
  .option("hoodie.compact.inline", "true")
  .option("hoodie.compact.inline.max.delta.commits", "10")
  .option("hoodie.compact.schedule.strategy", "NUM_COMMITS")
  .mode("append").save("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Deltacommits 1 through 9 each land an Avro log slice in ~2 s and return.
  2. On the 10th deltacommit, after the log slice is written and the deltacommit instant is COMPLETED, Hudi schedules and executes a compaction instant synchronously.
  3. The compactor reads the base + 10 log slices, writes a new base parquet, and emits the compaction COMPLETED instant. This takes ~60 s.
  4. The 10th write call returns ~62 s after invocation (2 s deltacommit + 60 s compaction). Every other call returns in ~2 s.
  5. After compaction, the 11th deltacommit again starts fresh against the new base, accumulating until the next 10-count threshold.

Output — latency observed at the writer.

Commit N Latency Notes
1 2 s normal
5 2 s normal
10 62 s inline compaction fires
11 2 s normal
20 62 s next inline compaction

Rule of thumb. Inline compaction is great for batch sinks where bimodal write latency is acceptable. For real-time / streaming sinks, switch to async.

Worked example — async compaction scheduler

Detailed explanation.

  • Goal. Decouple the compactor from the writer so the streaming sink never blocks.
  • Why this matters. A streaming Flink → Hudi sink that hits inline compaction would see end-to-end latency spike from 2 s to 60+ s every N commits. Async eliminates that.
  • The mechanic. The writer only schedules compaction (writes a REQUESTED instant). A separate Spark job or HoodieCompactor service polls for requested-but-not-completed compactions and executes them.

Question. Configure the Flink sink to schedule compactions but not execute them. Show what the compactor service looks like.

Input — Flink sink config.

Key Value
compaction.async.enabled false
compaction.schedule.enabled true
compaction.delta_commits 5

Code.

// Standalone compactor service (Spark)
import org.apache.hudi.utilities.HoodieCompactor

val cfg = new HoodieCompactor.Config()
cfg.basePath = "s3://lake/orders/"
cfg.runningMode = "execute"          // execute pre-scheduled compactions
cfg.parallelism = 4
HoodieCompactor.run(cfg, spark)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The Flink sink writes deltacommits as usual. Every 5th deltacommit, it also writes a compaction.requested instant to the timeline.
  2. The Flink sink does not execute the compaction — it returns immediately. Latency stays at ~2 s per commit.
  3. A separate Spark service runs continuously (or on a cron schedule). On each invocation it lists the timeline for REQUESTED-but-not-COMPLETED compactions, picks one, and executes it.
  4. Execution writes a new base parquet per affected file group and emits the compaction COMPLETED instant.
  5. The next reader sees the merged base; old log slices are eligible for cleaning.

Output — service architecture.

Process Responsibility Runs
Flink sink deltacommit + schedule compaction continuous
Spark compactor service execute scheduled compactions scheduled / continuous
Spark cleaner service GC obsolete versions scheduled

Rule of thumb. Async compaction is the production-grade default for streaming MoR tables. Pay the "extra service to monitor" cost; gain consistent write latency.

Worked example — cleaner policies

Detailed explanation.

  • Goal. Pick a cleaner policy that bounds storage cost while keeping incremental query consumers safe.
  • Why this matters. A cleaner policy that is too aggressive will silently break incremental consumers ("my last seen commit is no longer in the retention window — error"). A policy that is too generous will balloon storage.
  • The mechanic. Each policy uses a single threshold to decide what file versions are obsolete and can be GC'd.

Question. Compare KEEP_LATEST_COMMITS=10 vs KEEP_LATEST_FILE_VERSIONS=3 for a CoW table with hourly commits and a long-running incremental subscriber.

Input — workload.

Property Value
table type CoW
commit cadence hourly
incremental subscriber lag SLA up to 6 hours
average file size 256 MB per file group
file groups 1 000

Code.

# Policy comparison
KEEP_LATEST_COMMITS=10
  → retain ~10 file versions per file group (assuming each touched ~uniformly)
  → consumer with 6h lag: OK (lag < 10h retention)

KEEP_LATEST_FILE_VERSIONS=3
  → retain exactly 3 file versions per file group
  → consumer with 6h lag: at risk if a hot file group is rewritten >3 times
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. With KEEP_LATEST_COMMITS=10, the cleaner keeps file versions referenced by the latest 10 commits per the timeline. A 6 h-lag consumer is safe because at hourly commit cadence, 10 commits is 10 hours of headroom.
  2. With KEEP_LATEST_FILE_VERSIONS=3, the cleaner keeps only the latest 3 file versions per file group. If a hot file group is rewritten 4 or more times in 6 hours, the consumer's expected version is gone — and incremental query fails.
  3. The storage cost differs. KEEP_LATEST_COMMITS=10 retains 10× file groups of overhead; KEEP_LATEST_FILE_VERSIONS=3 retains 3× regardless of commit cadence.
  4. The right policy depends on whether the workload is "uniformly touched file groups" (in which case the two policies converge) or "skewed file groups" (in which case KEEP_LATEST_COMMITS is safer).

Output — recommendation.

Workload Policy Why
Uniform churn + steady consumer lag KEEP_LATEST_COMMITS aligns retention with consumer lag
Skewed hot file groups KEEP_LATEST_COMMITS every commit advances retention uniformly
Quiet table, bounded storage KEEP_LATEST_FILE_VERSIONS predictable storage
Time-travel SLA in hours KEEP_LATEST_BY_HOURS direct mapping

Rule of thumb. Default to KEEP_LATEST_COMMITS and set the threshold to the maximum incremental consumer lag SLA plus a safety margin (e.g. SLA × 2). Only switch to KEEP_LATEST_FILE_VERSIONS if storage cost is dominant and you have no incremental consumers.

Interview question on compaction + cleaning

A senior interviewer might frame this as: "You operate a MoR table with hourly deltacommits and NUM_COMMITS=24 inline compaction. Reads are getting slow at 22-23 commits into the cycle, then snap back after compaction. Design a fix that smooths read latency."

Solution Using async compaction with a tighter schedule strategy

// Writer config — schedule only
val writerOptions = Map(
  "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
  "hoodie.compact.inline" -> "false",
  "hoodie.compact.schedule.enabled" -> "true",
  "hoodie.compact.inline.max.delta.commits" -> "6",   // schedule every 6
  "hoodie.cleaner.policy" -> "KEEP_LATEST_COMMITS",
  "hoodie.cleaner.commits.retained" -> "48"
)

// Separate async compactor service runs every 5 minutes
// HoodieCompactor.run(execute-mode) on a cron
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Time Writer action Compactor action Read latency
T+0 deltacommit 1 (idle) base only
T+6h deltacommit 6 + schedule_compaction (idle) base + 6 logs
T+6h05 (deltacommit 7) execute_compaction → new base snap back to ~ base
T+12h deltacommit 12 + schedule_compaction (idle) base + 6 logs
T+12h05 (deltacommit 13) execute → new base base only again

The trace shows that scheduling every 6 commits (rather than 24) caps the log-merge cost on the snapshot read at ~6 log slices and never lets the read view degrade further. The writer never blocks.

Output:

Metric Old (inline every 24) New (async every 6)
Peak log slices before compaction 23 6
Peak snapshot read CPU high bounded
Writer p99 latency bimodal (2 s / 90 s) flat (~2 s)
Compactor service required no yes

Why this works — concept by concept:

  • Tighter compaction interval caps snapshot read cost — the worst-case log-merge work scales with the deltacommit-to-compaction ratio. Smaller ratio = lower peak read CPU.
  • Async eliminates writer blocking — the writer's job ends at "emit deltacommit + schedule compaction." A separate process executes.
  • Cleaner retains enough commits for consumersKEEP_LATEST_COMMITS=48 gives any incremental consumer up to 48 commits of lag headroom.
  • Schedule + execute split lets compaction scale horizontally — multiple compactor instances can pick up scheduled instants in parallel (with single-compactor-per-file-group enforcement).
  • Cost — write per commit: O(delta). Compaction per scheduled run: O(base + sum(log_slices)). Cleaner per run: O(obsolete_files). Snapshot read: O(rows + ≤6 log slices) instead of O(rows + 23 log slices).

SQL
Topic — etl
ETL pipeline problems (SQL)

Practice →


5. Picking the type — read latency vs write rate, multi-engine read

The Hudi pick reduces to two questions — how fast must reads be, and how often do writes arrive — and one constraint: which engines query the table

The mental model in one line: MoR for streaming sinks and high-frequency upserts, CoW for daily batch and read-heavy dashboards; the multi-engine read matrix (Spark / Flink / Presto / Trino) decides whether MoR's snapshot view is even available to your consumers. Once you say "two axes, one constraint," the decision falls out.

Iconographic 2x2 quadrant decision matrix — axes are 'read latency' (Y) and 'write rate' (X), with each quadrant carrying a recommendation chip for CoW or MoR; a floating top-right chip recommends MoR for streaming sinks and CoW for daily batch.

The decision matrix in one table.

Read latency Low write rate High write rate
Tight (sub-second) CoW — read one parquet; cheap MoR + frequent compaction — snapshot view for the hot rows, RO for the rest
Relaxed (minutes) CoW — simplest setup, fine for warehouse mirrors MoR + async compaction — streaming sink, CDC, IoT

The multi-engine read matrix.

Engine CoW snapshot MoR snapshot MoR read-optimised Notes
Spark yes yes yes reference implementation; all views supported
Flink yes yes (also streaming read) yes first-class streaming write and read
Presto yes yes (Hudi connector, recent versions) yes older versions: RO only
Trino yes yes (Hudi connector ≥ 360) yes sub-second on the RO view
Hive yes yes yes via InputFormat shims
Athena yes yes yes recent Athena engine ≥ v3

Why the matrix matters.

  • A table that must be readable by every engine in the matrix should default to CoW. Every engine reads CoW snapshot at parquet speed.
  • A MoR table can still be read by every engine in the read-optimised view, but the snapshot view sometimes lags behind the deltacommits. Confirm your engine version supports the snapshot before committing.
  • Flink is the only engine in the table that also writes MoR with first-class streaming semantics. If your pipeline is Flink-centric, MoR is a natural pick.

Common interview probes on the pick.

  • "When would you pick CoW for a streaming sink?" — almost never, unless the read SLA is so tight (sub-100 ms parquet scan) that any log-merge cost is unacceptable. Even then, MoR + frequent compaction usually wins.
  • "When would you pick MoR for a daily batch table?" — almost never. The write-amplification problem CoW has does not appear in daily batch; the extra MoR complexity is unjustified.
  • "Can you migrate from CoW to MoR (or back) without rewriting the table?" — not directly. You write new data into a fresh table with the new type and switch readers over.
  • "Does Presto see deltacommits in real time?" — only with the snapshot connector. The read-optimised connector waits for the next compaction.

The 2026 reality.

  • Streaming sinks are 80% MoR. CDC, Kafka, IoT, change-data-capture all default to MoR + async compaction.
  • Warehouse mirrors / golden tables are 90% CoW. Built once daily, read by every dashboard, no need for log-merge complexity.
  • Hudi vs Iceberg vs Delta — Hudi's MoR is its differentiator vs Iceberg (which only has CoW-equivalent behaviour for the user) and Delta (which has MERGE INTO but not the explicit MoR/CoW split). Iceberg's MOR semantics in 2026 are catching up but conceptually different (positional deletes + equality deletes).

Worked example — daily batch read (CoW)

Detailed explanation.

  • Goal. Configure a CoW table for a nightly warehouse mirror read by 50 BI dashboards.
  • Why this matters. Every dashboard query should run at parquet-scan speed; the writer runs once at 02:00 and rewrites whatever changed.
  • The mechanic. CoW with a daily commit cadence and a wide cleaner retention so that any long-running BI session does not get its snapshot GC'd mid-query.

Question. Pick the table type and key configs for a nightly orders mirror serving 50 dashboards.

Input — workload.

Property Value
commits per day 1 (at 02:00)
reads per day 50 dashboards × ~10 queries each
read SLA < 5 s p99
writer Spark batch

Code.

df.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
  .option("hoodie.datasource.write.operation", "bulk_insert")  // first load
  // .option("hoodie.datasource.write.operation", "upsert")    // incremental
  .option("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS")
  .option("hoodie.cleaner.commits.retained", "7")              // 1 week
  .option("hoodie.parquet.small.file.limit", "104857600")      // 100 MB
  .mode("append").save("s3://lake/orders/")
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. CoW is the right pick because writes are infrequent (1/day) and reads are very frequent. No log-merge logic ever runs.
  2. KEEP_LATEST_COMMITS=7 lets a Friday morning BI user keep reading the Monday snapshot if they happened to start the session early in the week.
  3. hoodie.parquet.small.file.limit=100MB triggers Hudi's small-file handling — incoming inserts merge with under-sized existing files so the file-group sizing stays healthy.
  4. BI queries always read the latest base file per file group — a vanilla parquet scan that Presto / Trino / Athena handle natively.

Output — operational profile.

Metric Value
Storage overhead vs raw parquet ~1× (just the .hoodie/ metadata)
BI query latency parquet-native
Writer runtime bounded by touched_file_groups × file_group_size
Compaction services needed none

Rule of thumb. Daily batch + read-heavy = CoW with a long cleaner window. The setup is boring and that is the point.

Worked example — streaming Kafka → MoR sink

Detailed explanation.

  • Goal. Configure a MoR table fed by Flink reading Kafka at 50 k events/s.
  • Why this matters. A CoW table at this write rate would rewrite parquet faster than Spark could keep up. MoR with async compaction is the textbook fit.
  • The mechanic. Flink writes deltacommits every checkpoint interval; a separate Spark service runs async compaction.

Question. Pick the table type, write operation, and compaction strategy for a Kafka sink at 50 k events/s with a 5-minute freshness SLA.

Input — workload.

Property Value
ingest rate 50 000 events/s
Flink checkpoint interval 60 s
freshness SLA 5 minutes
read SLA 30 s p99
readers Spark + Trino

Code.

-- Flink DDL for a MoR Hudi sink
CREATE TABLE orders_hudi (
    order_id     STRING,
    user_id      STRING,
    amount       DECIMAL(10, 2),
    event_ts     TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'                            = 'hudi',
    'path'                                 = 's3://lake/orders_hudi/',
    'table.type'                           = 'MERGE_ON_READ',
    'write.operation'                      = 'upsert',
    'compaction.async.enabled'             = 'false',
    'compaction.schedule.enabled'          = 'true',
    'compaction.delta_commits'             = '5',
    'hoodie.cleaner.policy'                = 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained'      = '48'
);
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Flink writes one deltacommit per checkpoint (every 60 s). That's ~60 commits/hour, ~3 M events/commit (50 000 × 60).
  2. Every 5 deltacommits (5 minutes), Flink writes a compaction.requested instant. It does not execute — the async compactor will.
  3. A standalone Spark compactor service polls the timeline every minute, finds requested compactions, and executes them. Each compaction merges 5 log slices into a fresh base parquet per affected file group.
  4. The freshness SLA is met because the snapshot view always merges the latest logs in. Even the read-optimised view is no more than ~5 minutes stale (1 compaction interval).
  5. The cleaner retains 48 commits ≈ 48 minutes of headroom for any incremental consumer.

Output — service topology.

Component Cadence Bytes / event written
Flink sink continuous tens of bytes (Avro append)
Spark compactor every ~5 min full file-group rewrite per touched fg
Spark cleaner every ~10 min nothing (just deletes obsolete)
Trino dashboard on read read-optimised view, parquet scan

Rule of thumb. Streaming sink + freshness SLA in minutes = MoR + async compaction. Wire the compactor as a separate service from the start; you'll need it for production.

Worked example — multi-engine read matrix in practice

Detailed explanation.

  • Goal. Predict how Spark, Flink, Presto, and Trino will see the same MoR table.
  • Why this matters. A platform team often inherits a MoR table written by Flink and queried by Trino — they need to know whether Trino sees the same data as Spark.
  • The mechanic. Each engine has a connector that supports a subset of views. The connector version matters.

Question. A MoR orders table has a base file at 09:00 and a deltacommit at 09:30. At 09:31, four engines query "show me the latest status for order_id=42." What do they see?

Input — table state at 09:31.

fileId base log slices
f7a2 09:00 base 1 log slice from 09:30 deltacommit (contains the update for order_id=42)

Code.

-- All four engines run the same logical query
SELECT order_id, status, _hoodie_commit_time
FROM orders
WHERE order_id = '42';
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Spark with query.type=snapshot: reads the base + log slice, merges them, surfaces the 09:30 status.
  2. Spark with query.type=read_optimized: reads the base only, surfaces the pre-09:30 status (stale).
  3. Flink streaming read: receives the deltacommit on its incremental stream, surfaces the 09:30 status with sub-second latency.
  4. Trino (Hudi connector ≥ 360): supports both snapshot and RO modes; default to RO. Switch to snapshot when freshness is required.
  5. Presto: same story as Trino; the snapshot connector is more recent than the RO connector, so older Presto deployments see only RO.

Output — observed status at 09:31.

Engine Mode Sees update at 09:30?
Spark snapshot yes
Spark read-optimised no
Flink streaming yes
Trino ≥ 360 snapshot yes
Trino ≥ 360 read-optimised no
Presto ≥ 0.270 snapshot yes (if connector supports it)

Rule of thumb. Pin the engine version and the view mode in your platform docs. The "two views" trade-off is the most common cause of "the Trino dashboard disagrees with the Spark batch job" incidents.

Interview question on the pick

A senior interviewer often closes the loop with: "You're starting a greenfield lakehouse. Pick the table type for (a) the curated orders fact table loaded hourly from a CDC stream, (b) the daily dimensional customers table, and (c) the IoT device_events raw landing table. Justify each pick in one sentence."

Solution Using the two-axis decision frame

(a) orders (CDC hourly, queried by Trino + Spark)
    → MERGE_ON_READ
    → write.operation=upsert, async compaction every 6 deltacommits
    → reason: CDC = high write rate, hourly queries can tolerate
      a 6-commit log-merge, async compaction smooths writer latency

(b) customers (daily batch, queried by every dashboard)
    → COPY_ON_WRITE
    → write.operation=bulk_insert + nightly upsert
    → reason: 1 commit/day = cheap rewrite, dashboard queries
      run at parquet speed, no compaction complexity

(c) device_events (very high ingest, queried by Spark only)
    → MERGE_ON_READ
    → write.operation=insert (no dedup), async compaction every 30 minutes
    → reason: very high write rate, schema is append-only so
      "insert" beats "upsert" on cost, Spark snapshot view fine
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Table Write rate Read freshness SLA Pick Compaction strategy
orders hourly CDC hourly MoR async every 6 deltacommits
customers daily batch hours CoW (none)
device_events continuous minutes MoR async every 30 min

Output:

Table Storage overhead Read latency Write latency
orders ~1.2× (with logs) base + ≤6 logs to merge ~2 s per deltacommit
customers ~1× parquet-native bounded by touched FG bytes
device_events ~1.3× base + ≤30-min logs sub-second append

Why this works — concept by concept:

  • Two-axis frame is sufficient — read-latency SLA and write rate dominate; everything else is a knob.
  • CDC = MoR is a default — high write rate + tolerable read lag is the textbook MoR sweet spot.
  • Daily batch = CoW is a default — no log-merge complexity to operate, parquet-native reads.
  • Insert vs upsert is orthogonal to type — even MoR can use insert when dedup is unnecessary, which lowers write CPU further.
  • Multi-engine constraint surfaces late — confirm Trino / Presto / Spark versions support the snapshot view before defaulting to MoR for tables read by all three.
  • Cost — write per commit: O(file_group_size) for CoW, O(delta) for MoR. Compaction: O(base + log_slices) per MoR file group per cycle. Read: O(scanned_rows) for CoW and MoR RO; O(scanned_rows + log_slices) for MoR snapshot.

SQL
Topic — real-time analytics
Real-time analytics problems (SQL)

Practice →


Cheat sheet — MoR vs CoW recipes

  • Default for daily batch tables. COPY_ON_WRITE with KEEP_LATEST_COMMITS=7 cleaner. Simple, parquet-native reads, bounded write cost.
  • Default for streaming sinks. MERGE_ON_READ with write.operation=upsert, compaction.schedule.enabled=true, compaction.async.enabled=false, and a separate Spark compactor service.
  • Default for very high ingest with no dedup. MERGE_ON_READ with write.operation=insert — skips the index lookup, lowest write CPU.
  • Pick NUM_COMMITS compaction strategy by default. Switch to TIME_ELAPSED when commit cadence is bursty and you want time-bounded log-merge cost.
  • Set hoodie.compact.inline.max.delta.commits to the largest value that keeps your snapshot read latency tolerable. Smaller is fresher; larger is cheaper.
  • Use the read-optimised view for dashboards that tolerate one compaction interval of staleness. Use the snapshot view for anything that needs sub-minute freshness.
  • Pin the engine version (Spark, Flink, Presto, Trino) in your platform docs. The Hudi connector capabilities vary across versions; the snapshot view is the most version-sensitive.
  • KEEP_LATEST_COMMITS=N cleaner is the default. Pick N ≥ max(incremental_consumer_lag_SLA, time-travel_SLA) in commits.
  • KEEP_LATEST_FILE_VERSIONS=N only when storage cost is dominant and you have no incremental consumers.
  • Compliance-grade deletes need both the delete commit and the next cleaner run. The retention window is the effective delete latency.
  • For CoW write amplification audits, the formula is touched_file_groups × file_group_size_mb per commit. If the result divided by net rows added is more than ~50 MB/row, switch to MoR or lengthen the commit interval.
  • For MoR read-cost audits, the formula is base_scan_cost + sum(log_slice_sizes_to_merge). If the snapshot scan is twice as slow as the read-optimised scan, schedule compaction more frequently.
  • Never delete files manually from .hoodie/ or partition directories. Use hudi-cli rollback and let Hudi reconcile the timeline.
  • Migration from CoW to MoR (or back) is a fresh-table-and-cutover operation, not an in-place config change.

Frequently asked questions

What's the difference between Hudi CoW and MoR in one paragraph?

Copy-on-Write rewrites the affected file group's base parquet on every commit, so readers always scan a single up-to-date parquet per file group with zero merge logic — fast reads at the cost of high write amplification. Merge-on-Read appends each upsert as a row-based Avro log slice next to the base parquet and defers the parquet rewrite to a separate compaction action — fast writes at the cost of read-time merging in the snapshot view (or slight read staleness in the read-optimised view). CoW is the textbook pick for daily batch and read-heavy dashboards; MoR is the textbook pick for streaming sinks, CDC pipelines, and high-frequency upserts.

When should I choose Hudi MoR over CoW?

Pick hudi merge on read when the write rate is high relative to the file-group size — typically streaming sinks (Kafka, Debezium CDC, IoT telemetry) and any pipeline with sub-minute commit intervals. The decisive metric is write amplification: if CoW's touched_file_groups × file_group_size per commit is more than ~50 MB per net row added, MoR will pay back its complexity within weeks. The two prerequisites are a tolerable read-freshness SLA (the snapshot view's merge cost or the read-optimised view's compaction-interval staleness must fit your dashboards) and a multi-engine read story that supports both views in your target Spark / Flink / Presto / Trino versions.

How does Hudi compaction work?

hudi compaction reads each MoR file group's base parquet plus every log slice on top of it, applies the deltas in commit order (using the precombine field to break ties), writes a fresh base parquet, and emits a compaction COMPLETED instant on the timeline. The action can be triggered inline (synchronous to the writer, after every N deltacommits) or async (scheduled by the writer but executed by a separate Spark or HoodieCompactor service). The most common configuration is hoodie.compact.inline=false plus hoodie.compact.schedule.enabled=true plus a separate compactor service polling the timeline, which keeps writer latency consistent while bounding read-merge cost.

How does Hudi compare to Iceberg and Delta Lake?

hudi vs iceberg vs delta is the most-asked lakehouse interview question in 2026, and the honest answer is that the three formats now overlap heavily but differ on their write-path defaults. Hudi alone exposes the explicit MERGE_ON_READ / COPY_ON_WRITE split at table creation, which gives operators the cleanest lever for write amplification trade-offs. Iceberg defaults to copy-on-write for inserts and offers merge-on-read style positional and equality deletes for streaming sinks; its strength is multi-engine support and schema evolution. Delta Lake's MERGE INTO collapses inserts, updates, and deletes into one statement and ships with deletion vectors for efficient deletes; it is the strongest pick when Databricks is the primary compute.

How does Hudi support multiple query engines like Spark, Flink, Presto, and Trino?

Hudi ships a per-engine connector that translates the timeline + file-group + log-slice model into the engine's native scan operator. Spark is the reference implementation and supports CoW snapshot, MoR snapshot, and MoR read-optimised views. Flink is the only engine that also writes MoR with first-class streaming semantics, including a streaming read that emits new rows as deltacommits land. Presto and Trino (via the Hudi connector ≥ 360 in 2026) support both CoW snapshot and both MoR views, though the snapshot connector is more recent than the read-optimised one — pin your engine versions in your platform docs to avoid the "Trino sees stale data" class of incident.

Practice on PipeCode

Pipecode.ai is Leetcode for Data Engineering — every Hudi recipe above ships with hands-on practice rooms where you write the CoW upsert, the MoR deltacommit, and the async compaction wiring against real graded inputs. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to guess whether your `hudi merge on read` configuration will hold under streaming load.

Practice streaming now →
ETL drills →

Top comments (0)