clickhouse is the answer almost every senior data engineering interview eventually circles back to when the question becomes "how do we serve a dashboard that scans billions of rows in under a second?" The OLAP world built around row-oriented warehouses (Postgres, MySQL, even Snowflake at small scale) flat-lines once interactive latency budgets dip below five seconds — and that is the gap a column-store engine built for vectorised aggregation was designed to close.
This guide walks the four mental models a clickhouse for data engineering interview keeps probing: the columnar storage and vectorised execution model that makes sub-second possible, the MergeTree family of table engines and why one of its six variants is almost always the right answer, the materialized views clickhouse insert-time aggregation pattern that turns one logical pipeline into 1-minute / 1-hour / 1-day pre-aggregations, and the clickhouse sharding plus replication grid that lets a cluster scale horizontally without losing any of the per-node speed. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.
When you want hands-on reps immediately after reading, drill the real-time analytics practice library →, rehearse on aggregation problems →, and stack the time-series muscles with time-series practice drills →.
On this page
- Why ClickHouse for sub-second analytics
- ClickHouse's role in the modern stack
- The MergeTree family — the heart of ClickHouse
- Materialized views — incremental aggregation engine
- Sharding and replication at scale
- Cheat sheet — ClickHouse recipes
- Frequently asked questions
- Practice on PipeCode
1. Why ClickHouse for sub-second analytics
Columnar storage and vectorised execution are the two ideas that make a billion-row aggregation feel instant
The one-sentence invariant: ClickHouse stores every column of a table as an independently compressed file and processes those files in CPU-cache-friendly batches of 65,536 values at a time — so a SELECT sum(amount) FROM events reads only the amount bytes, not the whole row, and crunches them with SIMD instead of one tuple at a time. Once you internalise "columns, not rows; batches, not tuples," every other ClickHouse design choice — MergeTree parts, sort-key skipping, materialized views — falls out as an obvious consequence.
The three places columnar wins.
-
Aggregations over a single column. A
SUM,AVG,MAX,quantile, oruniqon one column reads exactly that column's bytes from disk — typically 5–20x less I/O than a row-store equivalent on the same table. -
High-cardinality group-by. A
GROUP BY user_id, event_typeover a billion-row table is bottlenecked by hash-table memory and CPU, not I/O. Vectorised execution gives ClickHouse a 10–100x edge over Postgres on the same hardware. -
Time-range scans. With
PARTITION BY toYYYYMM(ts)andORDER BY (ts, user_id), ClickHouse prunes whole partitions and skips data parts via the primary-key sparse index — turning a 90-day query against a 5-year table into a single-partition read.
Three-line latency budget.
Real-time analytics is usually defined as interactive (humans wait for the answer): the contract is a P95 below 1–2 seconds. Streaming, in contrast, talks about end-to-end latency from event to query-visible. ClickHouse is built to win the interactive contract — it does not by itself ingest from Kafka in milliseconds, but it does serve a 50ms SELECT against the result.
What interviewers listen for.
- Do you say "columnar layout means we read only the columns we project" when asked why ClickHouse is fast? — senior signal.
- Do you mention vectorised execution as a complementary speedup to columnar I/O? — required answer.
- Do you call out append-heavy as the write pattern ClickHouse is optimised for? — required answer.
- Do you flag heavy updates / deletes as the workload to avoid? — senior signal.
The 2026 reality.
- ClickHouse Cloud and self-hosted both ship the same engine — Cloud adds object-storage tiering and managed Keeper.
- Cloudflare, Uber, ByteDance, Yandex all run ClickHouse at the multi-PB scale, often as the serving layer behind log analytics and ad-tech dashboards.
- Druid and Pinot occupy the same niche, but ClickHouse has won most net-new deployments since 2022 because its SQL surface is wider and its operational model simpler.
- Snowflake / BigQuery still dominate batch analytics; ClickHouse complements rather than replaces them — the lambda pattern is the common deployment.
Worked example — measuring the columnar speed-up on a single aggregate
Detailed explanation. A team migrates a events table from Postgres to ClickHouse. The headline query is SELECT toStartOfHour(ts) AS hour, count(), uniq(user_id) FROM events WHERE ts >= now() - INTERVAL 24 HOUR GROUP BY hour ORDER BY hour. On Postgres it scans every row; on ClickHouse it touches only the ts and user_id columns, and only the last day's partition.
Question. Given a 5-billion-row events table with 50 columns, estimate how much data ClickHouse reads vs Postgres for the hourly count + unique-user query above. Show the math, then write the canonical ClickHouse table definition that enables the optimisation.
Input.
| Column | Rows | Bytes/row (uncompressed) | Total bytes |
|---|---|---|---|
| All 50 cols | 5,000,000,000 | 250 | 1.25 TB |
ts only |
5,000,000,000 | 8 | 40 GB |
user_id only |
5,000,000,000 | 8 | 40 GB |
last-24h ts + user_id
|
50,000,000 | 16 | 800 MB |
Code.
CREATE TABLE events
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64,
properties String,
-- ... 45 more columns ...
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, user_id);
-- The query interviewers ask about
SELECT
toStartOfHour(ts) AS hour,
count() AS events,
uniq(user_id) AS unique_users
FROM events
WHERE ts >= now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour;
Step-by-step explanation.
- Postgres on this query reads every row in the time range — even with a btree index on
ts, the heap fetch pulls all 50 columns. On a 5B-row table, that is roughly 12.5 GB of heap reads for one day of data (250 bytes/row × 50M rows). - ClickHouse with
PARTITION BY toYYYYMMDD(ts)prunes every partition outside the last 24 hours — the planner only touches one or two partition directories. - Inside the touched partition, ClickHouse reads only the
tsanduser_idcolumn files — roughly 800 MB uncompressed for 50M rows. After LZ4 compression on disk, that drops to ~200 MB of actual disk I/O. - The
ORDER BY (ts, user_id)sort key makes the primary-key sparse index skip granules whosetsfalls outside the WHERE — the engine reads only the relevant granules, not the whole column file. - Vectorised aggregation crunches 65,536 rows per call, hitting SIMD
count()and a HyperLogLog-backeduniq()for the unique count.
Output.
| Engine | Data read | Wall time (typical) |
|---|---|---|
| Postgres (B-tree on ts) | ~12.5 GB | 30–90s |
| ClickHouse (MergeTree, partitioned) | ~200 MB | 80–400ms |
Rule of thumb. When the interactive latency budget is under a second on a billion-row table, the question is not "which row store can we tune?" — it is "which column store fits the shape?" ClickHouse is the default answer when the workload is append-heavy and aggregation-dominant.
Worked example — the workloads ClickHouse does NOT love
Detailed explanation. Senior interviewers love the negation question: "When is ClickHouse the wrong tool?" The answer is anywhere the workload demands frequent point updates, multi-statement transactions, or complex many-to-many joins between large tables. ClickHouse can do all three, but each fights the engine's design rather than leaning on it.
Question. Given a workload mix, classify each as "ClickHouse-native," "possible but painful," or "wrong tool." Justify each verdict in one sentence.
Input.
| Workload | Read pattern | Write pattern | Concurrency |
|---|---|---|---|
| Real-time analytics dashboard | aggregate over 100M rows | bulk insert from Kafka | 100 QPS |
| OLTP order entry | single-row lookup by PK | single-row insert + update | 1000 TPS |
| Ad-tech event log | timeseries aggregate over 50B rows | bulk insert from S3 | 10 QPS |
| Audit log | row-level fetch by ID | append-only, then GDPR delete | 1 QPS read, 0.01 delete |
| Star-schema BI fan-out | big fact joined to 6 dim tables | nightly batch | 5 QPS |
Code.
-- ClickHouse-native: real-time aggregation
SELECT toStartOfMinute(ts) AS minute, count()
FROM events
WHERE ts >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
-- Possible but painful: row-level GDPR delete
ALTER TABLE audit DELETE WHERE user_id = 12345;
-- ^ mutation: rewrites entire affected parts in the background.
-- Fine at low volume (occasional GDPR); fatal at high update volume.
-- Wrong tool: many-to-many join with no shard alignment
SELECT a.id, b.id
FROM big_fact_a a
JOIN big_fact_b b ON a.user_id = b.user_id;
-- ^ unless one side fits in memory or both share a shard key,
-- this generates a cross-shard shuffle that defeats the engine.
Step-by-step explanation.
- The real-time dashboard is the canonical ClickHouse use case — append-only ingest, aggregate-heavy reads, small projection set.
- OLTP order entry is the canonical wrong-tool: ClickHouse has no real row-level update, no MVCC, no per-row transactions. Use Postgres.
- Ad-tech event log at 50B rows is the canonical scale story — Cloudflare runs this exact shape.
- Audit log with occasional GDPR delete is the "possible but painful" middle ground — mutations work, but they rewrite entire parts in the background, so they are batch-friendly and human-frequency-friendly, not event-frequency-friendly.
- Star-schema fan-out is doable in ClickHouse via
Dictionarytables for small dimensions or careful shard-key co-location for large ones — but a senior interviewer expects you to call out the friction.
Output.
| Workload | Verdict | Reason |
|---|---|---|
| Real-time analytics dashboard | ClickHouse-native | aggregation over append-only data |
| OLTP order entry | Wrong tool | no row updates, no transactions |
| Ad-tech event log | ClickHouse-native | aggregation at petabyte scale |
| Audit log + occasional delete | Possible but painful | mutations are batch-scale |
| Star-schema BI fan-out | Possible with care | joins need dictionary or shard co-location |
Rule of thumb. Pick ClickHouse when the read pattern is "aggregate over a column" and the write pattern is "append from a stream or a bulk file." Reach for Postgres / a row store the moment the contract is "update this row, transact across rows, or look up one row by primary key 10,000 times a second."
Worked example — vectorised execution by hand
Detailed explanation. Vectorised execution is the often-missed second half of "why ClickHouse is fast." Even with columnar I/O, a row-by-row interpreter would burn cycles on per-tuple function dispatch. ClickHouse processes data in fixed-size column blocks (default 65,536 rows) and dispatches one function call per block — so the inner loop is a tight SIMD-friendly arithmetic kernel.
Question. Walk through how ClickHouse evaluates SELECT sum(value * 1.1) FROM events WHERE event_type = 'click' against a 1-billion-row table. Compare the cost model to a row-at-a-time interpreter.
Input (conceptual block).
| block_row | event_type | value |
|---|---|---|
| 0 | click | 10.0 |
| 1 | view | 0.0 |
| 2 | click | 5.0 |
| ... | ... | ... |
| 65535 | click | 8.0 |
Code.
SELECT sum(value * 1.1) AS total
FROM events
WHERE event_type = 'click';
Step-by-step explanation.
- ClickHouse reads one block (default 65,536 rows) of the
event_typeandvaluecolumns at a time — two separate column files, each compressed with LZ4. - The
WHERE event_type = 'click'filter is evaluated as a vectorised string-equality kernel that produces a bitmap of length 65,536 (1 bit per row). - The
value * 1.1projection runs as a vectorised float multiplication: one SIMD instruction processes 4 or 8 doubles in parallel per cycle on modern CPUs. - The
sum(...)aggregate folds the masked block into a single double, then accumulates into the running total. One function call processes 65,536 rows. - A row-at-a-time interpreter would dispatch one function call per row for the filter, one per row for the projection, and one per row for the aggregate — three function calls and three CPU cache misses per row, multiplied by 1B rows.
Output (numbers are illustrative).
| Engine | Function dispatches | Wall time |
|---|---|---|
| Row-at-a-time interpreter | 3 × 1,000,000,000 = 3B | ~30 minutes |
| ClickHouse vectorised | 3 × ~15,260 = ~46K | ~1.5 seconds |
Rule of thumb. When the latency budget is under a second on a column, you need both column-pruning and vectorisation. Single-row JIT (Spark / Postgres) gives you one without the other and tops out around 10x slower than a vectorised engine on the same hardware.
Senior interview question on the ClickHouse latency model
A senior interviewer often opens with: "Explain in 90 seconds why ClickHouse can serve a SELECT count(DISTINCT user_id) GROUP BY day over 30 billion rows in under a second when Postgres on the same hardware would take 20 minutes." This blends columnar storage, partition pruning, the sparse index, and vectorised execution into one answer.
Solution Using the four-layer latency model
-- The reference table that supports the sub-second query
CREATE TABLE events
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (toStartOfDay(ts), user_id, ts)
SETTINGS index_granularity = 8192;
-- The interactive query
SELECT
toStartOfDay(ts) AS day,
uniq(user_id) AS dau
FROM events
WHERE ts >= now() - INTERVAL 30 DAY
GROUP BY day
ORDER BY day;
Step-by-step trace.
| Layer | Mechanism | What it skips | Time saved |
|---|---|---|---|
| Partition pruning | PARTITION BY toYYYYMM(ts) |
96% of partitions (only last 2 months touched) | minutes → seconds |
| Sparse index skip | ORDER BY (toStartOfDay(ts), user_id, ts) |
granules outside the WHERE window | seconds → 500ms |
| Columnar I/O | reads ts and user_id only, not all columns |
90% of bytes | 500ms → 200ms |
Vectorised + HLL uniq
|
one block per dispatch, HyperLogLog approx | per-tuple dispatch + exact distinct | 200ms → 50ms |
After the trace, the team can answer the next interview question on the same breath: "If you needed exact distincts, you'd use uniqExact() and pay the memory cost. For dashboards, uniq() is the right default."
Output:
| day | dau |
|---|---|
| 2026-06-14 | 1,243,800 |
| 2026-06-13 | 1,189,420 |
| 2026-06-12 | 1,201,140 |
| ... | ... |
Why this works — concept by concept:
-
Partition pruning —
PARTITION BY toYYYYMM(ts)shards the on-disk layout by month. The planner inspects the WHERE predicate against partition keys and physically skips entire directories, turning a 30-billion-row scan into a 1-billion-row one. -
Sparse primary-key index — the
ORDER BYcolumns define the on-disk sort order. ClickHouse keeps one index entry perindex_granularity(default 8192) rows, so the index is tiny (~MB for a 10B-row table) yet still lets the engine skip entire granules whosetsfalls outside the WHERE. -
Columnar I/O — only the
tsanduser_idcolumn files are read. Each is LZ4-compressed on disk and decompressed in cache-friendly blocks, so the effective read amplification vs row store is roughlycolumns_read / columns_total. -
Vectorised execution + HLL
uniq— the aggregate runs in 65,536-row blocks with one function dispatch per block, anduniq()uses HyperLogLog so the distinct-count state per group is fixed-size (~16KB) regardless of cardinality. - Cost — O(filtered_rows) reads, O(blocks) function dispatches, O(groups × HLL_state) memory. The dominant term is I/O on the projected columns within the partitions touched.
SQL
Topic — real-time analytics
Real-time analytics problems (SQL)
2. ClickHouse's role in the modern stack
ClickHouse sits between the stream and the dashboard — the sub-second serving tier that a batch warehouse cannot reach
The mental model in one line: the modern real-time stack is sources → CDC → Kafka → ClickHouse → dashboards, with an optional parallel batch lane to a warehouse — and ClickHouse is the only component on the read path that satisfies an interactive (sub-second) latency budget. Once you can draw that pipeline, every "where does ClickHouse fit?" interview question collapses to "which arrow are you talking about?"
The five-zone reference architecture.
- Zone 1 — sources. Postgres / MySQL OLTP, app event firehoses, third-party webhooks. The data is row-oriented and transactional.
- Zone 2 — CDC + stream. Debezium tails the source binlog and produces a Kafka topic per table. Application events land in Kafka directly. Kafka is the durable buffer.
-
Zone 3 — ClickHouse. The
Kafkatable engine subscribes to a topic; a materialized view fans every insert into a downstreamMergeTreetable that owns the actual storage. The MV is the bridge between the stream and the column store. - Zone 4 — serve. Grafana / Superset query ClickHouse directly. Custom APIs query ClickHouse via the HTTP interface. Internal tools query through the Native protocol.
-
Zone 5 — batch lane (optional). A parallel
Source → DataLake → dbt → Snowflake / BigQuerylane backs the long-tail analytics and finance reports. This is the lambda-style two-engine deployment.
Two architecture patterns side by side.
- Lambda. Sources fan out to both a batch lake and ClickHouse. The batch lane handles correctness (re-processable, idempotent) and long retention. ClickHouse handles latency (sub-second) and the last 30–90 days. The dashboard joins the two only when explicitly needed.
- Kappa. All ingest goes through Kafka. ClickHouse via the Kafka table engine is the only consumer of record. Replays come from Kafka log compaction or from a separate S3-backed Kafka tier. There is no batch warehouse for analytics — only ClickHouse and (optionally) a cold S3 archive.
Where ClickHouse fits vs the alternatives.
| Engine | Latency contract | Write pattern | Replaces |
|---|---|---|---|
| ClickHouse | sub-second on aggregates | bulk insert from Kafka / S3 | Druid, Pinot, Vertica |
| Druid | sub-second on time-series | streaming ingest | ClickHouse on time-series |
| Pinot | sub-second on user-facing analytics | streaming ingest | ClickHouse on per-user views |
| Snowflake / BigQuery | seconds to minutes | bulk insert + dbt | Redshift, batch Hive |
| Postgres | milliseconds for OLTP, slow on aggregate | row-level transactions | OLTP MySQL |
Multi-tenant patterns.
- One table per customer. Heavy schema overhead, but isolation is perfect — drop a table to offboard a customer.
-
One table partitioned by customer_id. Single table, single MV, but every query needs
WHERE customer_id = Xto prune. - One table sharded by customer_id. Cluster-level isolation; large customers can be moved to dedicated shards.
- Per-tenant materialized view fan-out. Source table is shared; pre-aggregated views are per-customer with TTL.
Where the data engineer sits in this stack.
- Owns the Kafka → ClickHouse contract — topic format, MV mapping, schema evolution.
-
Owns the MergeTree schema —
ORDER BY,PARTITION BY, TTL, codec choices. - Owns the materialized-view roll-up tree — 1-minute, 1-hour, 1-day aggregates feed the dashboard.
- Owns the sharding key — once chosen, it is expensive to change.
Worked example — the canonical Kafka → ClickHouse → dashboard pipeline
Detailed explanation. A team ships a real-time funnel dashboard. App events flow through Kafka. The dashboard queries hourly counts and unique users by event_type. The team writes three objects in ClickHouse: a Kafka engine table (the consumer), a MergeTree table (the storage), and a materialized view that bridges them.
Question. Build the three-object pipeline that takes JSON events from a Kafka topic events and lands them in a MergeTree table events_local such that an hourly dashboard query is fast. Show the Kafka table, the target table, and the materialized view.
Input — Kafka topic schema (JSON).
| Field | Type | Example |
|---|---|---|
| ts | DateTime | 2026-06-15 09:12:30 |
| user_id | UInt64 | 1029384 |
| event_type | String | click |
| value | Float64 | 1.0 |
Code.
-- 1) The Kafka source table — a consumer, not storage
CREATE TABLE events_queue
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse-ingest',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3;
-- 2) The MergeTree storage table the dashboard queries
CREATE TABLE events_local
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;
-- 3) The materialized view that copies every Kafka insert into storage
CREATE MATERIALIZED VIEW events_mv TO events_local AS
SELECT
ts,
user_id,
event_type,
value
FROM events_queue;
Step-by-step explanation.
- The
Kafkaengine table is not a storage table. It is a consumer that pulls messages from a Kafka topic. EverySELECTfrom it consumes new messages. - The materialized view fires on every batch the Kafka consumer reads. The MV's
SELECT FROM events_queueis the read that advances the Kafka offset. - The MV writes into
events_localvia theTO events_localclause — the target table is the on-disk MergeTree. -
events_localis the table dashboards query. ItsPARTITION BY(day) lets queries prune byts; itsORDER BY (event_type, ts, user_id)lets queries filtered by event type skip whole granules. - The TTL clause expires data older than 90 days automatically — ClickHouse drops the affected parts in the background. Cold archival to S3 is a separate
MOVE PARTpolicy.
Output (after ingest is running for a few minutes).
| Step | Effect |
|---|---|
| Kafka producer writes 100K msgs/s |
events_queue advances offsets continuously |
| MV fires every batch | rows land in events_local
|
Dashboard runs GROUP BY toStartOfHour(ts)
|
scans events_local, not events_queue
|
| 90-day TTL | older partitions drop automatically |
Rule of thumb. Never query a Kafka engine table from a dashboard. Always land the data in a MergeTree via a materialized view first. The Kafka table is a moving cursor, not a queryable surface.
Worked example — choosing between lambda and kappa
Detailed explanation. Senior interviewers love the "do you need a batch lake?" follow-up. The honest answer is "it depends" — but the framing the candidate should bring is: lambda buys correctness, kappa buys simplicity. The right answer is whichever the team's two-week postmortem budget can afford.
Question. Given the requirements list below, decide whether to deploy lambda (ClickHouse + warehouse) or kappa (ClickHouse-only). Justify in one paragraph.
Input.
| Requirement | Value |
|---|---|
| Interactive dashboard latency | < 1s P95 |
| Long-tail analytics retention | 5 years |
| Re-processable on schema change | yes (compliance) |
| Daily event volume | 10B events/day |
| Team size | 4 data engineers |
Code (the two architectures as YAML).
# Lambda — two engines
sources:
- postgres-cdc: debezium
- app-events: kafka
batch_lane:
ingest: s3 (parquet)
transform: dbt-snowflake
retention: 5 years
serves: finance, ml, ad-hoc
speed_lane:
ingest: kafka -> clickhouse Kafka engine
storage: events_local (90d TTL)
serves: real-time dashboards
# Kappa — one engine
sources:
- postgres-cdc: debezium
- app-events: kafka
speed_lane:
ingest: kafka -> clickhouse Kafka engine
storage:
- events_local: 90d hot
- events_cold (s3 disk): 5y warm via storage policy
serves: dashboards, finance, ad-hoc
replays: from kafka tiered storage
Step-by-step explanation.
- The interactive contract (< 1s P95) forces ClickHouse into the speed lane regardless of architecture choice.
- The 5-year retention contract favours lambda if the warehouse is already running, kappa if ClickHouse's S3-tiered storage is acceptable for cold data.
- The "re-processable on schema change" requirement favours lambda — the immutable parquet lake is the canonical replay source. Kappa can do it via Kafka tiered storage but with more operational overhead.
- 10B events/day is well within ClickHouse's single-cluster comfort zone (~150K events/sec).
- A 4-engineer team usually benefits from kappa's "one fewer engine to operate" — lambda's complexity grows superlinearly with team size on the operations side.
Output.
| Architecture | Pros | Cons | Verdict for this team |
|---|---|---|---|
| Lambda | clean re-processing, mature dbt tooling, finance team familiar with Snowflake | two engines, two costs, two pipelines to schema-evolve | strong choice if Snowflake already exists |
| Kappa | one engine, one schema-evolution surface, simpler to operate | replay requires Kafka tiered storage, dbt-on-ClickHouse is newer | strong choice for greenfield |
Rule of thumb. Start kappa if the team is greenfield and small; layer lambda on top only when an explicit batch use case (finance, ML training data) cannot be served by ClickHouse. The "one engine" argument compounds against complexity over years.
Worked example — multi-tenant table layout
Detailed explanation. Multi-tenant ClickHouse usually starts as "one shared table with customer_id in the sort key" and only graduates to per-customer tables or sharding once one customer's volume dominates the rest. The transition is operationally expensive, so the choice of sort key has to anticipate the future.
Question. Given a SaaS analytics product with 200 customers ranging from 1M events/day to 1B events/day, design the ClickHouse table layout that supports per-customer dashboards in sub-second.
Input.
| Customer count | Per-customer event volume |
|---|---|
| 195 | < 50M events/day |
| 4 | 50M – 500M events/day |
| 1 | > 1B events/day |
Code.
-- Shared table for the small/medium customers
CREATE TABLE events_shared
(
customer_id UInt32,
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = MergeTree
PARTITION BY (customer_id, toYYYYMM(ts))
ORDER BY (customer_id, toStartOfHour(ts), user_id);
-- Dedicated table for the giant customer
CREATE TABLE events_customer_999
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (toStartOfHour(ts), user_id);
-- Query layer routes by customer_id
-- (application-level routing, NOT a UNION ALL)
Step-by-step explanation.
-
PARTITION BY (customer_id, toYYYYMM(ts))means each customer's data lives in its own physical directory per month. Queries filtered bycustomer_idprune to one customer's data immediately. - The
ORDER BYstarts withcustomer_id— the sparse index for any single-customer query is dense and lets the engine skip to that customer's range fast. - The giant customer (>1B/day) gets its own table because their data alone is bigger than the rest combined. Mixing them in the shared table would force every shared query to scan past their granules.
- Routing logic lives in the application — a small lookup table maps
customer_id → table_name. The query layer dispatches accordingly. - Sharding (next section) is the further evolution when one customer outgrows a single node.
Output (latency contract).
| Customer size | Table | Per-customer dashboard latency |
|---|---|---|
| Small (1M/day) | events_shared |
30–80ms |
| Medium (50M/day) | events_shared |
100–300ms |
| Large (1B/day) | events_customer_999 |
200–500ms |
Rule of thumb. Make customer_id the first column of ORDER BY (or PARTITION BY) on day one. The next decision — dedicated table or dedicated shard — is operationally cheap if the sort key already isolates the tenant. Retrofitting tenant isolation onto a non-tenant-keyed table is painful enough to be the most common reason for a v2 rewrite.
Senior interview question on real-time stack design
A senior interviewer often opens with: "Design the data pipeline for a real-time analytics product that ingests 100K events/sec from Kafka and serves a sub-second dashboard. Where does ClickHouse sit, what does the Kafka contract look like, and how do you handle a downstream schema change?"
Solution Using a four-component pipeline
-- 1) The Kafka source table (consumer)
CREATE TABLE kafka_events
(
ts DateTime CODEC(DoubleDelta, LZ4),
user_id UInt64,
event_type LowCardinality(String),
properties String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '...',
kafka_topic_list = 'events',
kafka_group_name = 'ch-prod',
kafka_format = 'JSONEachRow';
-- 2) The MergeTree storage table
CREATE TABLE events
(
ts DateTime CODEC(DoubleDelta, LZ4),
user_id UInt64,
event_type LowCardinality(String),
properties String
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, toStartOfHour(ts), user_id)
TTL ts + INTERVAL 90 DAY;
-- 3) The bridge MV
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT ts, user_id, event_type, properties
FROM kafka_events;
-- 4) The roll-up MV for the dashboard
CREATE TABLE events_hourly
(
hour DateTime,
event_type LowCardinality(String),
events AggregateFunction(count),
users AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);
CREATE MATERIALIZED VIEW events_hourly_mv TO events_hourly AS
SELECT
toStartOfHour(ts) AS hour,
event_type,
countState() AS events,
uniqState(user_id) AS users
FROM events
GROUP BY hour, event_type;
Step-by-step trace.
| Component | Role | Reads | Writes |
|---|---|---|---|
kafka_events |
Kafka consumer | Kafka topic events
|
nothing (cursor only) |
events_mv |
bridge | kafka_events |
events (raw storage) |
events |
raw storage | dashboard ad-hoc | nothing |
events_hourly_mv |
roll-up |
events on insert |
events_hourly |
events_hourly |
dashboard surface | dashboard | nothing |
When a schema change comes in (e.g. add a column region), the team adds it to kafka_events and events with ALTER TABLE ... ADD COLUMN region String DEFAULT '', then to the MV bodies. ClickHouse can ALTER MATERIALIZED VIEW ... MODIFY QUERY to evolve the body without dropping the target table.
Output:
| Dashboard query | Latency |
|---|---|
| Hourly event count by type (last 30 days) | 40–120ms |
| Unique users per hour by type | 60–200ms |
| Top 10 event types over last day | 30–80ms |
Why this works — concept by concept:
-
Separation of concerns — the
Kafkaengine is the cursor, theMergeTreeis the storage, and theAggregatingMergeTreeis the dashboard surface. Each component owns one job, so a failure in one does not corrupt the others. -
Bridge MV pattern —
CREATE MATERIALIZED VIEW ... TO target AS SELECT ... FROM kafka_eventsis the canonical bridge. It fires on every insert into the source and lands the transformed rows in the target. -
Roll-up MV with -State functions —
countState()anduniqState()produce partial aggregate states that are stored inAggregatingMergeTree. Background merges roll them up further; queries finalize them withcountMerge/uniqMerge. -
Schema-evolution safety — the
Kafkaengine table, the storage table, and the MV all need the column added together. ClickHouse 23+ supportsALTER MATERIALIZED VIEW ... MODIFY QUERYto evolve MVs in place. - Cost — O(events_per_sec) per Kafka batch; O(events × MV_count) for materialized-view fanout; O(unique_groups × state_size) for the aggregating target table. Dashboard cost is O(touched_partitions) on the much smaller roll-up.
SQL
Topic — streaming
Streaming pipeline problems (SQL)
3. The MergeTree family — the heart of ClickHouse
MergeTree is one engine, six personalities — the variant you pick is the variant your write pattern needs
The mental model in one line: MergeTree is a columnar table engine that writes immutable on-disk "parts" and merges them in the background according to the ORDER BY key — and the family variants (ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree, ReplicatedMergeTree) layer additional semantics onto the merge step. Once you say "the merge is when the variant's magic happens," the entire family becomes a memorisation exercise.
The family in one table.
| Engine | Merge-step semantic | Common use |
|---|---|---|
MergeTree |
none — just sort and merge parts | base columnar table |
ReplacingMergeTree |
dedupe by sort key, keep latest version | CDC upsert sink |
SummingMergeTree |
sum numeric columns by sort key | pre-aggregated counters |
AggregatingMergeTree |
merge -State aggregate columns by sort key |
materialized-view roll-ups |
CollapsingMergeTree |
collapse sign = -1 rows against sign = +1 rows |
row-level updates via tombstones |
VersionedCollapsingMergeTree |
same as Collapsing, but with a version column | concurrent CDC streams |
ReplicatedMergeTree (and variants) |
adds Keeper-coordinated replication on top | every production cluster |
PARTITION BY vs ORDER BY — two different concepts.
-
PARTITION BYdefines the physical directory structure on disk. Each unique partition expression value is a separate directory. The planner prunes whole partitions before reading anything. Use coarse expressions liketoYYYYMM(ts)— fine-grained partitions (e.g. per-hour) create thousands of tiny directories and crater performance. -
ORDER BYdefines the sort order within a part, and the sparse primary-key index is built on the first N columns. The planner uses it to skip granules (8192-row chunks). Use the highest-cardinality WHERE / GROUP BY columns here in cardinality order.
Parts and merges in plain words.
- Every
INSERTcreates one or more new on-disk parts under the partition directory. - Background merges combine small parts into larger ones, applying the variant-specific semantic during the merge.
- A part is immutable — to "update" a row, you write a new part with the new value and let the variant-specific merge resolve.
-
OPTIMIZE TABLE ... FINALforces a merge of all parts in a partition. Useful for testing, dangerous in production at scale.
Common interview probes.
- "What does
MergeTreeactually merge?" — parts. Small parts created by inserts are merged into larger parts in the background to keep the part count low. - "What is the difference between
ReplacingMergeTreeandCollapsingMergeTree?" — Replacing keeps the latest row per sort key; Collapsing requires the writer to emit a+1row for "current" and a-1row for "old" — the two collapse during merge. - "What is
LowCardinality?" — a string codec that dictionary-encodes the column. Small distinct sets (event_type, status, region) become 1–2 byte integers on disk and in memory. - "What is
index_granularity?" — the sparse index granule size (default 8192). Each index entry covers 8192 rows.
Worked example — choosing MergeTree vs ReplacingMergeTree for a CDC sink
Detailed explanation. A team lands Postgres CDC events into ClickHouse. Each event is a full row image with a primary key. The team wants to query "the current state of every order" — but ClickHouse does not natively update rows. The fix is ReplacingMergeTree: every insert is an append, but the merge step deduplicates by sort key, keeping the latest version.
Question. Build the CDC sink table. The source emits one row per change with order_id, status, amount, and an updated_at timestamp. Show the table definition and the query that returns the "current state" of orders.
Input (rows arriving over time).
| insert # | order_id | status | amount | updated_at |
|---|---|---|---|---|
| 1 | 1 | placed | 100 | 2026-06-15 09:00 |
| 2 | 1 | shipped | 100 | 2026-06-15 10:00 |
| 3 | 2 | placed | 50 | 2026-06-15 11:00 |
| 4 | 1 | delivered | 100 | 2026-06-15 12:00 |
Code.
CREATE TABLE orders_cdc
(
order_id UInt64,
status LowCardinality(String),
amount Decimal(12, 2),
updated_at DateTime
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY order_id;
-- Query for current state — use FINAL or argMax
SELECT order_id, status, amount, updated_at
FROM orders_cdc
FINAL;
-- Or, without FINAL (cheaper, more typing)
SELECT
order_id,
argMax(status, updated_at) AS status,
argMax(amount, updated_at) AS amount,
max(updated_at) AS updated_at
FROM orders_cdc
GROUP BY order_id;
Step-by-step explanation.
-
ReplacingMergeTree(updated_at)says "during merges, when two rows share the sameORDER BYkey (order_id), keep the one with the greaterupdated_at." - Between merges, every version of every row is still on disk. A query without
FINALsees all four rows. -
FINALforces the engine to apply the dedup semantic at query time — at the cost of extra read amplification. Fine for dashboards on small tables; expensive on billion-row tables. - The
argMax(col, updated_at) GROUP BY order_idpattern is the cheap alternative: it computes the same answer withoutFINAL, at the cost of a GROUP BY scan. - For the highest-traffic queries, build a downstream materialized view that pre-aggregates the current state into a smaller table.
Output (current state of orders).
| order_id | status | amount | updated_at |
|---|---|---|---|
| 1 | delivered | 100 | 2026-06-15 12:00 |
| 2 | placed | 50 | 2026-06-15 11:00 |
Rule of thumb. Use ReplacingMergeTree for CDC sinks where you only ever care about the latest version. Pair it with argMax(...) GROUP BY pk for hot queries; reserve FINAL for low-QPS dashboards and ad-hoc sanity checks.
Worked example — SummingMergeTree for a pre-aggregated counter table
Detailed explanation. When the read pattern is "give me the running total per key," and the write pattern is "many small increments," SummingMergeTree collapses the per-key rows during merge — the on-disk size shrinks and reads scan fewer rows.
Question. Build a per-day click counter table where each insert is (day, page_id, +1) and the dashboard reads "clicks per page per day." Show the table and the query.
Input.
| day | page_id | clicks |
|---|---|---|
| 2026-06-15 | A | 1 |
| 2026-06-15 | A | 1 |
| 2026-06-15 | B | 1 |
| 2026-06-15 | A | 1 |
Code.
CREATE TABLE clicks_daily
(
day Date,
page_id String,
clicks UInt64
)
ENGINE = SummingMergeTree(clicks)
PARTITION BY toYYYYMM(day)
ORDER BY (day, page_id);
INSERT INTO clicks_daily VALUES
('2026-06-15', 'A', 1),
('2026-06-15', 'A', 1),
('2026-06-15', 'B', 1),
('2026-06-15', 'A', 1);
-- Read pattern
SELECT day, page_id, sum(clicks) AS clicks
FROM clicks_daily
WHERE day = '2026-06-15'
GROUP BY day, page_id
ORDER BY clicks DESC;
Step-by-step explanation.
-
SummingMergeTree(clicks)declares that during a merge, rows sharing the sameORDER BYkey (day, page_id) collapse into one row whoseclickscolumn is the sum. - Before merge: 4 rows. After merge: 2 rows (
Awith 3,Bwith 1). - The read pattern still uses
sum(clicks) GROUP BY ...— this is required because between merges, there may still be multiple rows per key. Always GROUP BY + sum, never trust the row count. - The on-disk size approaches the cardinality of
(day, page_id)after enough merges — perfect for high-volume counters. - For multi-column counters (e.g.
clicks + impressions + revenue), list them all in the engine:SummingMergeTree((clicks, impressions, revenue)).
Output.
| day | page_id | clicks |
|---|---|---|
| 2026-06-15 | A | 3 |
| 2026-06-15 | B | 1 |
Rule of thumb. Use SummingMergeTree when every increment is a row and the dashboard wants the sum per key. Pair it with AggregatingMergeTree (next section) when you also need distinct counts, quantiles, or anything beyond plain sum.
Worked example — CollapsingMergeTree for row-level updates
Detailed explanation. CollapsingMergeTree is the "I really do need row updates" answer. The writer emits two rows for every logical update: a sign = -1 "cancel" row for the old state, and a sign = +1 "create" row for the new. The merge step pairs them and drops both — leaving only the latest version.
Question. Track an order's current status using CollapsingMergeTree. Show the insert sequence for a "placed → shipped" transition and the dashboard read.
Input (rows emitted by the application).
| order_id | status | sign |
|---|---|---|
| 1 | placed | +1 |
| 1 | placed | -1 |
| 1 | shipped | +1 |
Code.
CREATE TABLE orders_collapsing
(
order_id UInt64,
status LowCardinality(String),
sign Int8
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY order_id;
-- The writer must emit pair-rows for every update
INSERT INTO orders_collapsing VALUES
(1, 'placed', +1);
-- ... later, when order ships:
INSERT INTO orders_collapsing VALUES
(1, 'placed', -1),
(1, 'shipped', +1);
-- Read pattern uses SUM(sign) trick
SELECT
order_id,
argMax(status, sign) AS status
FROM orders_collapsing
GROUP BY order_id
HAVING sum(sign) > 0;
Step-by-step explanation.
- Three rows enter the table:
(1, placed, +1),(1, placed, -1),(1, shipped, +1). - During merge, the engine pairs the
(placed, +1)row with the(placed, -1)row (sameorder_idand same all-columns-except-sign) and drops both. Only(1, shipped, +1)remains. - Between merges, all three rows are still on disk. The read pattern uses
HAVING sum(sign) > 0to filter out "fully cancelled" keys. - The application must do extra work: read the previous state, emit a cancel row, emit a new row. Often the OLTP source does not know the previous state, which is why Replacing is more common.
- Use Collapsing when the application does know the previous state (e.g. the OLTP writes events as
(before, after)pairs), or when you need to delete individual rows without rewriting parts.
Output.
| order_id | status |
|---|---|
| 1 | shipped |
Rule of thumb. Reach for CollapsingMergeTree only when the application has a clean "before / after" event source. For the more common "I just have the latest version" pattern, ReplacingMergeTree is simpler — the application emits one row, ClickHouse handles the dedup.
Worked example — ReplicatedMergeTree for production HA
Detailed explanation. Every production ClickHouse cluster uses a Replicated*MergeTree engine variant. The replication is coordinated by ZooKeeper or, increasingly, ClickHouse Keeper. Replicas are eventually consistent — a write commits locally, then propagates to peers within milliseconds to seconds.
Question. Convert the single-node events table into a replicated one. Show the engine signature, the Keeper path convention, and how a query reads from any replica.
Input. Single-node table:
CREATE TABLE events (...) ENGINE = MergeTree
ORDER BY (event_type, ts);
Code.
-- Replicated version (per-shard, per-replica)
CREATE TABLE events ON CLUSTER prod
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events', -- Keeper path: shared per shard
'{replica}' -- replica name: unique per node
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id);
Step-by-step explanation.
-
ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')declares that this table participates in replication. The first argument is the Keeper path shared across all replicas of the same shard. The second is the replica name, unique per node. -
{shard}and{replica}are macros defined in each node'sconfig.xml. Onnode-1athey might resolve toshard=01,replica=node-1a. -
ON CLUSTER prodruns the DDL on every node in the named cluster — both shards and replicas. Without it, you have to run the CREATE on each node manually. - After creation, every write to one replica is committed locally, then asynchronously replicated to peers via Keeper-tracked log entries.
- Reads can hit any replica. The load balancer (or the ClickHouse
Distributedengine on top) picks one per query.
Output (cluster topology).
| Shard | Replica | Keeper path | Role |
|---|---|---|---|
| 01 | node-1a | /clickhouse/tables/01/events |
accept writes, serve reads |
| 01 | node-1b | /clickhouse/tables/01/events |
accept writes, serve reads |
| 02 | node-2a | /clickhouse/tables/02/events |
accept writes, serve reads |
| 02 | node-2b | /clickhouse/tables/02/events |
accept writes, serve reads |
Rule of thumb. Use Replicated*MergeTree for every production table without exception. Single-node MergeTree is for development and ETL scratch space only. The cost of switching from single-node to replicated after a year of writes is rebuilding the table.
Senior interview question on choosing the right MergeTree variant
A senior interviewer often opens with: "We are landing Postgres CDC into ClickHouse and want to (a) get the current state of every row, (b) keep a 30-day audit trail, and (c) survive a node failure. Which MergeTree variants do you use and how do you assemble them?"
Solution Using a ReplicatedReplacingMergeTree plus an AggregatingMergeTree roll-up
-- 1) CDC sink: replicated, dedup-on-merge
CREATE TABLE orders_cdc ON CLUSTER prod
(
order_id UInt64,
status LowCardinality(String),
amount Decimal(12, 2),
updated_at DateTime
)
ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/tables/{shard}/orders_cdc',
'{replica}',
updated_at -- version column for Replacing
)
PARTITION BY toYYYYMM(updated_at)
ORDER BY order_id
TTL updated_at + INTERVAL 30 DAY; -- 30-day audit retention
-- 2) Current-state view (logical — uses argMax)
CREATE VIEW orders_current AS
SELECT
order_id,
argMax(status, updated_at) AS status,
argMax(amount, updated_at) AS amount,
max(updated_at) AS updated_at
FROM orders_cdc
GROUP BY order_id;
Step-by-step trace.
| Requirement | Engine choice | Why |
|---|---|---|
| (a) Current state of every row | ReplicatedReplacingMergeTree(updated_at) |
merge dedupes by order_id, keeps the latest updated_at
|
| (b) 30-day audit trail | TTL updated_at + INTERVAL 30 DAY |
older rows drop automatically |
| (c) Survive node failure |
Replicated... prefix + Keeper |
every write replicated; any replica can serve reads |
| Hot read of current state | argMax(...) GROUP BY order_id |
avoids FINAL cost on dashboards |
After the dedup merge fires, ClickHouse keeps only one row per order_id. The TTL clause drops anything older than 30 days from the audit trail. Replication keeps both sides — current state and audit — symmetrical across replicas.
Output (the orders_current view).
| order_id | status | amount | updated_at |
|---|---|---|---|
| 1 | delivered | 100.00 | 2026-06-15 12:00 |
| 2 | placed | 50.00 | 2026-06-15 11:00 |
| 3 | shipped | 200.00 | 2026-06-15 10:30 |
Why this works — concept by concept:
- ReplicatedReplacingMergeTree — combines the replication contract (every write goes to every replica via Keeper-tracked log entries) with the Replacing semantic (dedup by sort key during merge). One engine, two layered concerns.
-
Version column —
ReplicatedReplacingMergeTree(..., updated_at)tells the engine which column tiebreaks duplicates: keep the row with the greatestupdated_at. Without it, the engine keeps an arbitrary row, which is rarely what the application wants. -
TTL for retention —
TTL updated_at + INTERVAL 30 DAYmakes the engine schedule background drops for any part whose every row has aged out. No cron job, no DELETE statement. -
argMax pattern instead of FINAL —
argMax(col, version) GROUP BY pkproduces the same answer asSELECT ... FINALbut uses a normal aggregation instead of a full table re-scan at query time. Pay the GROUP BY cost once per query, not the FINAL cost per granule. - Cost — O(parts) for the dedup merge (background); O(unique_keys) for the argMax aggregation; O(replicas) network amplification for the write. Reads on either side scale with the touched-partition byte count, not the row count.
SQL
Topic — time-series
Time-series problems (SQL)
4. Materialized views — incremental aggregation engine
ClickHouse materialized views are insert-time triggers, not refresh-on-schedule — the difference is the whole story
The mental model in one line: a ClickHouse materialized view is an INSERT INTO target SELECT ... FROM source that fires every time the source table receives a batch — there is no schedule, no refresh, no cron. Once you internalise "MVs are triggers, not refreshes," the entire materialized-view interview surface (POPULATE, -State functions, cascading MVs) becomes a sequence of obvious follow-ons.
The insert-time MV contract.
- The MV is a stored
SELECTquery plus a target table. - When a batch of N rows arrives in the source, the MV's
SELECTruns over that batch only (not the whole source table) and inserts the result into the target. - The MV does not maintain incremental state — it sees one batch, produces one output, and forgets.
- "Refreshable" MVs (a 2024+ feature) are a separate construct on a schedule; they are not what an interview means by "materialized view."
The two MV idioms.
-
TO target_table— the canonical 2024+ pattern. The MV writes into an existing table you defined separately. Backfill is straightforward (INSERT INTO target SELECT ... FROM source). -
POPULATEat create time — runs the SELECT once over the existing source data, then enables trigger mode. Convenient for one-shot setup; dangerous on large tables because it locks until done.
-State / -Merge / -MergeState — the aggregate function trinity.
-
countState(),uniqState(col),sumState(col),quantileState(col)— return a partial-aggregate state object, not a final value. Suitable for storage in anAggregatingMergeTree. -
countMerge(state),uniqMerge(state)— finalize a state into a number, typically at query time. -
countMergeState(state),uniqMergeState(state)— combine multiple states into one new state. Used in cascading MVs.
Cascading MVs in plain words.
- The 1-minute roll-up MV reads from
raw_eventson insert and writes toagg_1m. - A second MV reads from
agg_1mon insert and writes toagg_1h— combining 60 minute-states into one hour-state via*MergeState. - A third MV reads from
agg_1hand writes toagg_1d. - Each MV fires only on its source's inserts, so the cascade is incremental from end to end.
Common interview probes.
- "Are ClickHouse MVs refreshed on a schedule?" — no. They fire on every source insert.
- "What is the difference between
-Stateand-Merge?" —-Stateproduces a partial state for storage;-Mergefinalizes a state into a number for reading. - "How do you backfill a new MV with historical data?" — either use
POPULATEat create time, or create the MV first (which captures new inserts) then runINSERT INTO target SELECT ... FROM source WHERE ts < cutoff. - "What happens if the source schema changes?" — the MV's
SELECTmust match the new schema; otherwise the trigger fails. Always evolve the MV body withALTER MATERIALIZED VIEW ... MODIFY QUERY.
Worked example — a 1-minute pre-aggregation MV
Detailed explanation. A dashboard needs "events per minute, per event_type" with sub-second latency over the last 24 hours. A raw events table at 100K events/sec would force the dashboard to scan 8.6B rows. An AggregatingMergeTree table fed by an MV reduces that to one row per (minute, event_type) — typically a few thousand rows per minute total.
Question. Build the target events_1m table and the MV that maintains it. Show how the dashboard query reads from it.
Input (raw events arriving at high rate).
| ts | user_id | event_type | value |
|---|---|---|---|
| 2026-06-15 09:00:00.123 | 1 | click | 1.0 |
| 2026-06-15 09:00:00.456 | 2 | click | 1.0 |
| 2026-06-15 09:00:05.789 | 1 | view | 1.0 |
Code.
-- Target table for the 1-minute roll-up
CREATE TABLE events_1m
(
minute DateTime,
event_type LowCardinality(String),
events AggregateFunction(count),
users AggregateFunction(uniq, UInt64),
value_sum AggregateFunction(sum, Float64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(minute)
ORDER BY (event_type, minute);
-- The MV that fires on every batch in `events`
CREATE MATERIALIZED VIEW events_1m_mv TO events_1m AS
SELECT
toStartOfMinute(ts) AS minute,
event_type,
countState() AS events,
uniqState(user_id) AS users,
sumState(value) AS value_sum
FROM events
GROUP BY minute, event_type;
-- Dashboard query
SELECT
minute,
event_type,
countMerge(events) AS events,
uniqMerge(users) AS users,
sumMerge(value_sum) AS value_sum
FROM events_1m
WHERE minute >= now() - INTERVAL 24 HOUR
GROUP BY minute, event_type
ORDER BY minute, event_type;
Step-by-step explanation.
- The
AggregatingMergeTreetarget stores partial aggregate states, not finalized numbers. Each column is typed asAggregateFunction(name, arg_types). - The MV's
SELECTruns over each insert batch intoevents. TheGROUP BY minute, event_typecollapses the batch into one row per (minute, event_type) combination. -
countState()produces a tiny state (an integer);uniqState(user_id)produces a HyperLogLog state (~16KB at full density, but compact for small groups);sumState(value)is a single float. - Background merges in
AggregatingMergeTreecombine states for the sameORDER BYkey — turning many small states into one big state per (minute, event_type). - The dashboard reads with
*Mergefunctions, which finalize the states into numbers. TheGROUP BY minute, event_typein the read is required because between merges multiple rows per key may still exist.
Output (one row per (minute, event_type) after merges).
| minute | event_type | events | users | value_sum |
|---|---|---|---|---|
| 2026-06-15 09:00 | click | 1240 | 980 | 1240.0 |
| 2026-06-15 09:00 | view | 800 | 720 | 800.0 |
| 2026-06-15 09:01 | click | 1310 | 1010 | 1310.0 |
Rule of thumb. Always use *State in the MV body and *Merge in the read query. Mixing them ("can I just store count() instead of countState()?") breaks the moment the target table accumulates more than one row per key — which happens after every merge.
Worked example — cascading MVs (1-minute → 1-hour → 1-day)
Detailed explanation. When the dashboard has three zoom levels (last hour at minute resolution, last day at hour resolution, last month at day resolution), the cleanest architecture is a cascade: the 1-minute MV feeds the 1-hour MV, which feeds the 1-day MV. Each MV fires only on its direct source's inserts.
Question. Extend the 1-minute roll-up with 1-hour and 1-day cascade MVs. Show the engine and the chained MV definitions.
Input. The events_1m table from the previous example.
Code.
-- 1-hour target
CREATE TABLE events_1h
(
hour DateTime,
event_type LowCardinality(String),
events AggregateFunction(count),
users AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);
-- Cascade MV: events_1m -> events_1h
CREATE MATERIALIZED VIEW events_1h_mv TO events_1h AS
SELECT
toStartOfHour(minute) AS hour,
event_type,
countMergeState(events) AS events,
uniqMergeState(users) AS users
FROM events_1m
GROUP BY hour, event_type;
-- 1-day target
CREATE TABLE events_1d
(
day Date,
event_type LowCardinality(String),
events AggregateFunction(count),
users AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(day)
ORDER BY (event_type, day);
-- Cascade MV: events_1h -> events_1d
CREATE MATERIALIZED VIEW events_1d_mv TO events_1d AS
SELECT
toDate(hour) AS day,
event_type,
countMergeState(events) AS events,
uniqMergeState(users) AS users
FROM events_1h
GROUP BY day, event_type;
Step-by-step explanation.
-
events_1m_mvfires on every insert intoeventsand writes the per-minute aggregate states intoevents_1m. -
events_1h_mvfires on every insert intoevents_1mand rolls 60 minute-states into one hour-state viacountMergeStateanduniqMergeState. -
events_1d_mvdoes the same one level up — 24 hour-states into one day-state. - The MergeState variants take existing states and combine them, producing a new state (not a finalised number). This is what makes the cascade incremental.
- Each cascade level writes 60x less data than the previous —
events_1dis roughlyevents_1m / 1440.
Output (sketch of disk sizes).
| Table | Rows per day | Storage |
|---|---|---|
events |
8.6B | ~100 GB |
events_1m |
1440 × distinct event_types | ~50 MB |
events_1h |
24 × distinct event_types | ~2 MB |
events_1d |
1 × distinct event_types | ~100 KB |
Rule of thumb. Cascade MVs trade disk for read latency at each level. Three levels (1m / 1h / 1d) is the sweet spot for most dashboards. Beyond that, the operational complexity of maintaining the cascade outweighs the marginal scan reduction.
Worked example — backfilling a new MV without POPULATE
Detailed explanation. POPULATE is convenient but blocks on inserts during the backfill. A cleaner two-step pattern is to (1) create the MV first so it captures new inserts, then (2) backfill historical data with INSERT INTO target SELECT ... FROM source WHERE ts < cutoff.
Question. A new events_1h MV needs to be backfilled with 90 days of history without blocking the live ingest. Show the two-step backfill.
Input. events has 90 days of history. events_1h_mv is the MV. events_1h is the target.
Code.
-- Step 0: pick a cutoff *before* creating the MV.
-- We will backfill rows with ts < cutoff.
-- The MV will catch every row with ts >= cutoff via trigger.
-- Step 1: create the MV (it starts firing on new inserts immediately)
CREATE MATERIALIZED VIEW events_1h_mv TO events_1h AS
SELECT
toStartOfHour(ts) AS hour,
event_type,
countState() AS events,
uniqState(user_id) AS users
FROM events
GROUP BY hour, event_type;
-- Step 2: backfill historical data in batches
INSERT INTO events_1h
SELECT
toStartOfHour(ts) AS hour,
event_type,
countState() AS events,
uniqState(user_id) AS users
FROM events
WHERE ts < now() - INTERVAL 24 HOUR -- safety gap to avoid double-counting
GROUP BY hour, event_type;
Step-by-step explanation.
- Step 1: create the MV. From this moment, every new insert into
eventsfires the MV and lands rows inevents_1h. - Step 2: explicitly backfill historical rows via
INSERT INTO events_1h SELECT ... FROM events WHERE ts < cutoff. The cutoff has a safety gap to avoid double-counting rows that may have been ingested between Step 1 and Step 2. - The
AggregatingMergeTreeengine handles the overlap automatically — duplicate (hour, event_type) keys merge their states. As long as the cutoff is conservative, the slight overlap is harmless (states combine, not duplicate values). - For very large backfills, run Step 2 in batched ranges (e.g. per partition) to avoid one giant query.
- The alternative —
CREATE MATERIALIZED VIEW ... POPULATE AS ...— does both steps in one statement but blocks the source from receiving inserts until done. Unacceptable on a live table.
Output.
| Phase | Source covered | Notes |
|---|---|---|
| Step 1: MV created | ts >= now() |
every new row triggers it |
| Step 2: INSERT SELECT | ts < now() - INTERVAL 24 HOUR |
one-shot backfill in batches |
| Net coverage | full range, slight overlap at boundary | overlap absorbed by AggregatingMergeTree |
Rule of thumb. Never use POPULATE on a live table. The two-step "create MV, then INSERT SELECT" pattern is safer, batchable, and survives the operator pressing Ctrl-C halfway through. Pay the 30 seconds of extra typing.
Worked example — the MV-source-join pitfall
Detailed explanation. A common bug: the MV body joins the source table to another table. The MV only fires on inserts into the source — joins read the target of the join at insert time. If a fact-table insert arrives before its dim-table row, the join misses and the MV writes a row with NULL dim values that never updates.
Question. Diagnose the pitfall in the MV below and propose two fixes.
Input. A events table and a users dim table. The MV joins them.
Code (the broken MV).
CREATE MATERIALIZED VIEW events_enriched_mv TO events_enriched AS
SELECT
e.ts,
e.user_id,
e.event_type,
u.country,
u.tier
FROM events e
LEFT JOIN users u ON u.user_id = e.user_id;
-- ^ bug: the join reads `users` at the time the events batch arrives.
-- If the user row is added later, the joined columns stay NULL forever.
Step-by-step explanation.
- The MV fires on inserts into
events. TheLEFT JOIN usersis evaluated at that moment. - If
eventshas a row foruser_id = 12345butusersdoes not yet, the LEFT JOIN returns NULL forcountryandtier. The MV writes NULL intoevents_enriched. - Later, when
usersgets the row for 12345, the existingevents_enrichedrow does not automatically update — there is no recalculation. The data is permanently stale. -
Fix 1 — Dictionary. Define
usersas aDictionaryin ClickHouse. Dictionaries are looked up at query time onevents_enriched, so the join is fresh on every read. -
Fix 2 — defer the enrichment. Drop the join from the MV; do the enrichment at the dashboard query layer with
JOIN usersordictGet(...).
Output (the fix using a Dictionary).
CREATE DICTIONARY users_dict
(
user_id UInt64,
country String,
tier String
)
PRIMARY KEY user_id
SOURCE(CLICKHOUSE(TABLE 'users'))
LIFETIME(MIN 300 MAX 600)
LAYOUT(HASHED());
-- The MV now stores raw events, no join
CREATE MATERIALIZED VIEW events_enriched_mv TO events_enriched AS
SELECT
ts, user_id, event_type
FROM events;
-- Enrichment happens at query time, against the live dictionary
SELECT
event_type,
dictGet('users_dict', 'country', user_id) AS country,
count()
FROM events_enriched
WHERE ts >= now() - INTERVAL 1 HOUR
GROUP BY event_type, country;
Rule of thumb. Never join in an MV body if the right side of the join can update independently. Use Dictionaries for small dimension tables (refreshed on a TTL) and defer enrichment to the read query when the dim is large or changes frequently.
Senior interview question on materialized-view design
A senior interviewer often frames this as: "Design the materialized-view tree for a real-time analytics product that needs to serve hourly DAU (distinct user count) over the last 30 days with sub-100ms latency. Walk through the engine choices, the -State / -Merge functions, and the backfill plan."
Solution Using an AggregatingMergeTree roll-up with HyperLogLog uniq state
-- 1) Raw events table
CREATE TABLE events
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;
-- 2) Hourly DAU roll-up target
CREATE TABLE dau_hourly
(
hour DateTime,
event_type LowCardinality(String),
users AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);
-- 3) The MV that fires on every batch of `events`
CREATE MATERIALIZED VIEW dau_hourly_mv TO dau_hourly AS
SELECT
toStartOfHour(ts) AS hour,
event_type,
uniqState(user_id) AS users
FROM events
GROUP BY hour, event_type;
-- 4) Dashboard query
SELECT
hour,
event_type,
uniqMerge(users) AS dau
FROM dau_hourly
WHERE hour >= now() - INTERVAL 30 DAY
GROUP BY hour, event_type
ORDER BY hour;
Step-by-step trace.
| Layer | What it does | Latency contribution |
|---|---|---|
events (raw) |
append-only, partitioned, sorted | not on read path |
dau_hourly_mv (trigger) |
fires on each insert batch into events
|
adds ~1–3% write overhead |
dau_hourly (target) |
AggregatingMergeTree storing HLL states | one row per (hour, event_type) after merges |
| Dashboard read |
uniqMerge on ~24×30×N rows |
sub-100ms |
After 30 days, dau_hourly holds roughly 720 hours × distinct event_types rows. With a dozen event types, that is ~8K rows — a flat scan is microseconds.
Output:
| hour | event_type | dau |
|---|---|---|
| 2026-06-15 08:00 | click | 124,300 |
| 2026-06-15 08:00 | view | 198,420 |
| 2026-06-15 09:00 | click | 143,100 |
| 2026-06-15 09:00 | view | 211,820 |
Why this works — concept by concept:
-
Insert-time MV trigger — the MV fires on every batch into
events, not on a schedule. The roll-up table is always up-to-date with the latency of the Kafka consumer (typically 1–10 seconds). -
AggregatingMergeTree with
uniqState—uniqState(user_id)produces a HyperLogLog state that approximates the unique count. The state is fixed-size (~16KB at full cardinality), so the roll-up table size is bounded bygroups × state_size, not by the input row count. -
-State at write, -Merge at read —
-Stateproduces partial states for storage;-Mergefinalizes them at query time. This is the only correct pattern; storinguniq(...)directly would forbid further aggregation. -
Schema-evolution path — add a column with
ALTER TABLE events ADD COLUMN ..., thenALTER TABLE dau_hourly ADD COLUMN ..., thenALTER MATERIALIZED VIEW dau_hourly_mv MODIFY QUERY .... The MV body change is the one that needs care; storage adds are cheap. - Cost — write amplification per MV roughly equals (input rows / group cardinality) — for a billion-row day into ~24×10 groups, that is a ~4M×N reduction. Read cost on the dashboard is O(touched_rows_in_target) × O(uniqMerge cost), measured in milliseconds for 30-day windows.
SQL
Topic — data aggregation
Aggregation problems (SQL)
5. Sharding and replication at scale
Distributed table + Replicated*MergeTree is the production pattern — shards scale write throughput, replicas scale HA and read concurrency
The mental model in one line: a ClickHouse cluster is a grid of shards × replicas, where each shard owns a disjoint slice of the data (by sharding key) and each replica within a shard is a fully synchronized copy — and a Distributed table on top fans queries out across all shards in parallel. Once you can draw the 3×2 grid (3 shards, 2 replicas each), the entire scaling story collapses to "more shards = more write throughput, more replicas = more read HA."
The four building blocks.
-
Local table — a
Replicated*MergeTreeon each node. Replicas of the same shard share the same Keeper path; replicas of different shards have different paths. - Distributed table — a thin "fan-out" engine that lives on every node and routes reads and writes across all shards. It owns no data of its own.
-
Sharding key — a deterministic function (typically
cityHash64(user_id)) that maps each row to a shard. Picked once; expensive to change. - ClickHouse Keeper (or ZooKeeper) — the coordination service that sequences replication log entries and DDL.
Sharding key choice.
-
Hash sharding —
cityHash64(user_id). Even distribution across shards; same user always lands on the same shard (useful for per-user joins). -
Random sharding —
rand(). Perfectly even, but per-user joins must hit every shard viaGLOBAL JOIN. -
Custom sharding — a domain-specific function (e.g.
customer_id % 4). Useful for multi-tenant where you want a specific customer on a specific shard.
Distributed reads.
- A
SELECTon the Distributed table fans out to every shard. Each shard runs the same query locally on one replica (the load balancer picks). - Partial results stream back to the coordinator, which aggregates and returns.
- For joins that need cross-shard data,
GLOBAL JOINsends the right-side table contents to every shard.
Distributed writes.
- A write to the Distributed table routes the row to its target shard based on the sharding key.
- The Distributed engine can buffer briefly (
distributed_directory_monitor_sleep_time_ms) and batch the forwarded writes. - For high-throughput, applications often write directly to the local
Replicated*MergeTreeon a chosen shard, skipping the Distributed table's overhead.
Replication contract.
- A write on one replica is committed locally, then asynchronously propagated to peers via Keeper-tracked log entries.
- Replication is eventually consistent — readers may see fresher data on one replica than another for a few seconds during catch-up.
-
system.replicasandsystem.replication_queuetables show per-shard replication health.
Common interview probes.
- "What is the difference between a Distributed table and a Replicated table?" — Distributed fans queries across shards (horizontal scaling); Replicated synchronizes a single shard's data across replicas (HA).
- "Why does ClickHouse use ClickHouse Keeper instead of ZooKeeper?" — same Raft contract, but written in C++ and packaged with ClickHouse, simpler ops.
- "Can you change the sharding key online?" — not easily. The common pattern is to write to a new cluster with the new sharding key and dual-write during migration.
- "What is
GLOBAL INand when do you need it?" — when a subquery in a Distributed query needs to be evaluated once on the coordinator and broadcast to all shards (rather than executed independently per shard).
Worked example — a 3-shard × 2-replica reference cluster
Detailed explanation. The canonical small-but-real ClickHouse cluster is 3 shards × 2 replicas — 6 nodes total. This is the smallest topology that demonstrates both write fan-out (sharding) and read HA (replication). Most production clusters scale by adding shards (for write throughput) or replicas (for read concurrency).
Question. Define the events table on a 3-shard × 2-replica cluster. Show the per-node local table, the Distributed table on top, and the cluster configuration sketch.
Input (cluster XML config sketch).
| Shard | Replicas |
|---|---|
| 01 | node-1a, node-1b |
| 02 | node-2a, node-2b |
| 03 | node-3a, node-3b |
Code.
-- 1) Local table on every node (created ON CLUSTER)
CREATE TABLE events_local ON CLUSTER prod
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events_local',
'{replica}'
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;
-- 2) Distributed table on top (also ON CLUSTER)
CREATE TABLE events ON CLUSTER prod AS events_local
ENGINE = Distributed(
prod, -- cluster name from config.xml
default, -- database
events_local, -- local table
cityHash64(user_id) -- sharding key
);
Step-by-step explanation.
-
events_localis the storage table. Each node has its own copy keyed by{shard}and{replica}macros. Replicas of the same shard share a Keeper path; replicas of different shards do not. -
eventsis the router table. It owns no data; it routes reads and writes acrossevents_localon every shard. - The sharding key
cityHash64(user_id)deterministically maps eachuser_idto a shard. All events for a given user land on the same shard, which makes per-user joins cheap. -
ON CLUSTER prodruns the DDL on every node listed under the clusterprodinconfig.xml. Without it, you'd run the CREATE on each node manually. - Applications can write to
events(the Distributed table) for convenience or directly toevents_localon a chosen shard for max throughput.
Output (topology).
| Node | Shard | Replica | Owns |
|---|---|---|---|
| node-1a | 01 | r1 | shard 01 data (master copy) |
| node-1b | 01 | r2 | shard 01 data (replicated copy) |
| node-2a | 02 | r1 | shard 02 data |
| node-2b | 02 | r2 | shard 02 data |
| node-3a | 03 | r1 | shard 03 data |
| node-3b | 03 | r2 | shard 03 data |
Rule of thumb. Start every production deployment as 2 shards × 2 replicas (4 nodes). Scale by adding shards when write throughput is the bottleneck; add replicas when read concurrency is. The 3-shard × 2-replica grid is the minimum to demonstrate the pattern in interviews.
Worked example — choosing the sharding key
Detailed explanation. The sharding key choice is one of the most consequential decisions in a ClickHouse cluster. A bad choice causes hot shards (skewed write traffic) or cross-shard joins (slow). The default answer is cityHash64(user_id) for user-facing analytics — even distribution and per-user co-location in one expression.
Question. For each workload below, pick the sharding key and explain in one sentence.
Input.
| Workload | Per-row identity | Common query pattern |
|---|---|---|
| User-event log | user_id |
per-user funnel |
| Time-series metrics | (metric_name, ts) |
metric over time |
| Ad impressions | (campaign_id, user_id) |
campaign-level aggregate |
| Multi-tenant SaaS | (customer_id, ...) |
per-customer dashboard |
Code.
-- 1) User-event log: hash on user_id for even distribution + per-user co-location
ENGINE = Distributed(prod, default, events_local, cityHash64(user_id));
-- 2) Time-series metrics: hash on metric_name to keep each metric on one shard
ENGINE = Distributed(prod, default, metrics_local, cityHash64(metric_name));
-- 3) Ad impressions: hash on campaign_id, since campaign-level aggregates dominate
ENGINE = Distributed(prod, default, impressions_local, cityHash64(campaign_id));
-- 4) Multi-tenant SaaS: hash on customer_id so each tenant lives on one shard
ENGINE = Distributed(prod, default, events_local, cityHash64(customer_id));
Step-by-step explanation.
-
user_idhash gives even distribution (assuminguser_idis roughly random) and co-locates a user's events on one shard — per-user joins become per-shard joins. -
metric_namehash keeps each time-series on one shard. Time-range scans become per-shard scans rather than cross-shard. Watch for a "celebrity metric" — one metric with disproportionate traffic — which would create a hot shard. -
campaign_idhash is right when campaigns dominate the read pattern. If a single mega-campaign skews traffic, fall back to(campaign_id, user_id)hash to spread. -
customer_idhash gives tenant isolation at the shard level. Large customers can be moved to dedicated shards via cluster reshape; small customers share shards.
Output (trade-off summary).
| Sharding key | Even distribution? | Per-key co-location? | Cross-shard joins? |
|---|---|---|---|
cityHash64(user_id) |
yes (if user_id random) | yes per user | only for cross-user aggregates |
cityHash64(metric_name) |
yes (if many metrics) | yes per metric | only for cross-metric aggregates |
cityHash64(campaign_id) |
yes (if many campaigns) | yes per campaign | for cross-campaign cohorts |
cityHash64(customer_id) |
depends on customer mix | yes per customer | rarely needed |
rand() |
perfect | no | always |
Rule of thumb. The sharding key is "what does my dashboard query group by most often?" If the answer is user_id, hash by user. If the answer is customer_id, hash by customer. Co-location at write time pays off at read time.
Worked example — GLOBAL IN for cross-shard subqueries
Detailed explanation. A subquery in a Distributed query is executed per shard by default — every shard runs the subquery independently against its own local data. When the subquery should produce the same result on every shard (e.g. "the top 100 users globally"), use GLOBAL IN: the coordinator runs the subquery once and broadcasts the result to every shard.
Question. Find every event by the top 100 users (by total event count) globally. Show the naive query and the GLOBAL IN fix.
Input. events is a Distributed table over 3 shards.
Code.
-- BROKEN: each shard computes its own "top 100 by local count"
SELECT *
FROM events
WHERE user_id IN (
SELECT user_id
FROM events
GROUP BY user_id
ORDER BY count() DESC
LIMIT 100
);
-- FIX: GLOBAL IN — coordinator runs the subquery once, broadcasts result
SELECT *
FROM events
WHERE user_id GLOBAL IN (
SELECT user_id
FROM events
GROUP BY user_id
ORDER BY count() DESC
LIMIT 100
);
Step-by-step explanation.
- The broken query fans out the outer SELECT to every shard. Each shard then independently runs the inner subquery against its own data — producing 3 different "top 100 by local count" lists.
- The outer WHERE on each shard checks
user_id IN (local_top_100), which excludes users whose events happen to be on a different shard. - The
GLOBAL INfix changes the execution: the coordinator runs the inner subquery once (which itself fans out to every shard for the aggregation), collects the top 100 globally, then broadcasts that list to every shard for the outer WHERE. - Now every shard filters by the same global top-100 list. The result is what the user actually wanted.
-
GLOBAL JOINis the analogous fix for joins where the right side needs to be computed once and broadcast.
Output.
| Query | Result |
|---|---|
Naive IN
|
each shard's local top 100, no global consistency |
GLOBAL IN |
events by the true global top 100 users |
Rule of thumb. Whenever a subquery on a Distributed table should produce a single result for the whole cluster (not per-shard), use GLOBAL IN or GLOBAL JOIN. Without GLOBAL, every shard re-runs the subquery against its own partial data.
Worked example — measuring replication lag
Detailed explanation. Replication in ClickHouse is asynchronous — writes commit locally then propagate. Under normal load, lag is sub-second; under heavy bulk inserts, it can climb to seconds or tens of seconds. Monitoring the gap is the first step toward debugging "the dashboard shows stale data on one replica" tickets.
Question. Write a system-table query that reports per-shard replication lag in seconds. Explain the columns it reads.
Input. A running cluster with events_local on every node.
Code.
SELECT
database,
table,
replica_name,
is_leader,
absolute_delay,
queue_size,
log_max_index,
log_pointer
FROM system.replicas
WHERE table = 'events_local'
ORDER BY absolute_delay DESC;
Step-by-step explanation.
-
system.replicasis the live view of replication health. It exposes one row per replicated table per node. -
absolute_delay(seconds) is the time since the most recent unmerged log entry was generated on the leader. Anything > 30 is worth investigating. -
queue_sizeis the count of pending log entries waiting for this replica to apply. A growing queue with steadylog_max_indexmeans the replica is falling behind. -
log_max_indexis the most recent log entry index globally;log_pointeris this replica's local pointer. The difference is the count of pending log entries. -
is_leaderrotates between replicas of the same shard. Routine reads can hit any replica; some DDL (mutations, drops) goes through the leader.
Output.
| database | table | replica_name | is_leader | absolute_delay | queue_size |
|---|---|---|---|---|---|
| default | events_local | node-1a | 1 | 0 | 0 |
| default | events_local | node-1b | 0 | 2 | 4 |
| default | events_local | node-2a | 1 | 0 | 0 |
| default | events_local | node-2b | 0 | 15 | 87 |
Rule of thumb. Alert on absolute_delay > 30 seconds per replicated table. Alert on queue_size growing for more than 60 seconds. Both indicate that the replica is not keeping up with writes — either because of network, disk, or merge backlog.
Senior interview question on cluster scaling
A senior interviewer often asks: "Your ClickHouse cluster is 3 shards × 2 replicas. Write QPS is doubling every quarter and a single shard is now hitting CPU saturation. Walk me through how you scale, what breaks, and how you keep the dashboard online."
Solution Using a four-step horizontal scale-out plan
-- Step 1: stand up new shards (4 and 5) ON CLUSTER prod
CREATE TABLE events_local ON CLUSTER prod_v2 -- new cluster def includes 5 shards
(
ts DateTime,
user_id UInt64,
event_type LowCardinality(String),
value Float64
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/v2/{shard}/events_local',
'{replica}'
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id);
-- Step 2: new Distributed table over the 5-shard cluster
CREATE TABLE events_v2 ON CLUSTER prod_v2 AS events_local
ENGINE = Distributed(
prod_v2,
default,
events_local,
cityHash64(user_id)
);
-- Step 3: backfill (or dual-write from the Kafka MV bridge)
-- Option A: backfill from old cluster using remote() function
INSERT INTO events_v2
SELECT * FROM remote('prod_old_clusters', default.events_local)
WHERE ts >= '2026-01-01';
-- Option B: dual-write at the MV bridge layer (Kafka -> both clusters)
-- by adding a second MV that targets the new cluster.
-- Step 4: cut the dashboard over to events_v2 and decommission v1
Step-by-step trace.
| Step | Action | Risk |
|---|---|---|
| 1 | Stand up 2 new shards with their replicas | new nodes; no data yet |
| 2 | Define events_v2 as Distributed over 5-shard cluster |
router only; no traffic yet |
| 3 | Backfill or dual-write to populate the new shards | I/O-heavy; do in batches |
| 4 | Cut dashboards to events_v2, decommission events
|
requires app-config push |
The migration is online: the old cluster keeps serving until the dashboard is cut over. The hard part is Step 3 — the backfill must respect the new sharding function so that cityHash64(user_id) % 5 routes rows to the right new shard.
Output (after migration):
| Cluster | Shards | Replicas | Write throughput | Dashboard latency |
|---|---|---|---|---|
prod (old) |
3 | 2 | 150K events/sec | 200–500ms |
prod_v2 |
5 | 2 | 250K events/sec | 100–300ms |
Why this works — concept by concept:
- Add shards to scale write throughput — each new shard owns a slice of the hash space. Write throughput scales linearly because each shard handles its own partition's inserts and merges.
- Add replicas to scale read concurrency and HA — each replica can independently serve reads. Two replicas tolerate one node failure; three tolerate two.
- Distributed table is a thin router — it owns no data, so reshaping the cluster (adding shards) does not lose any of the cluster's data when done correctly. The migration risk is in the backfill, not in the router.
- Dual-write at the MV bridge — if the Kafka → ClickHouse MV is the only writer, adding a second MV that targets the new cluster gives you dual-write for free during migration. Cut the dashboard, then drop the old MV.
- Cost — O(rows × N_replicas) write amplification per shard; O(touched_partitions / shards) read latency reduction per added shard. The migration itself is O(historical_rows / network_throughput).
SQL
Topic — design
System design problems (DE)
Cheat sheet — ClickHouse recipes
-
Default time-series schema.
ENGINE = MergeTree PARTITION BY toYYYYMMDD(ts) ORDER BY (entity_id, toStartOfHour(ts), ts)— coarse partition, sort by the most-filtered column then time. -
Real-time roll-up.
AggregatingMergeTreetarget + materialized view with-Stateaggregate functions. Read with*MergeandGROUP BYat query time. -
Dedup on CDC.
ReplacingMergeTree(version_col)withargMax(col, version_col) GROUP BY pkfor hot queries; reserveFINALfor low-QPS dashboards. -
Distributed table.
ENGINE = Distributed(cluster, db, local_table, cityHash64(shard_key))— co-locate the most-grouped column on one shard. -
Backfill a new MV. Two-step: (1) create the MV (captures new inserts via trigger), (2)
INSERT INTO target SELECT ... FROM source WHERE ts < cutofffor history. AvoidPOPULATEon live tables. -
Schema evolve an MV.
ALTER MATERIALIZED VIEW mv MODIFY QUERY ...after adding the column to source and target withADD COLUMN. -
Test cardinality before partitioning.
SELECT uniq(col) FROM table LIMIT 1— if a candidate partition column has > 1000 distinct values, it is too fine-grained forPARTITION BY. -
Compress for time-series.
CODEC(DoubleDelta, LZ4)on monotonic timestamps;CODEC(T64, LZ4)on bounded integers;CODEC(LZ4HC(9))for cold data. -
Replication health.
SELECT replica_name, absolute_delay, queue_size FROM system.replicas WHERE absolute_delay > 30. -
Inspect parts.
SELECT partition, count() FROM system.parts WHERE active GROUP BY partition— too many parts per partition (>50) indicates merge pressure. -
Force a merge (test only).
OPTIMIZE TABLE x PARTITION 'YYYYMMDD' FINAL— never run unconditionally in production at scale. -
Insert via Kafka engine. Kafka table (
ENGINE = Kafka) + MV (TO target) + MergeTree target — the canonical three-object pipeline. -
GLOBAL IN for cross-shard subqueries. Whenever the subquery should yield one global result, write
WHERE col GLOBAL IN (subquery). -
Dictionaries for joins. Define small dimension tables as
DictionarywithLAYOUT(HASHED())+LIFETIME(MIN 300 MAX 600); read withdictGet('dict', 'col', key)for sub-millisecond lookups.
Frequently asked questions
What is ClickHouse used for?
ClickHouse is an open-source columnar OLAP database designed for sub-second interactive analytics over billions of rows. It is the default answer for real-time dashboards, log analytics, event-stream aggregation, ad-tech metrics, and any workload where the read pattern is "aggregate over a column" and the write pattern is "append from a stream or a bulk file." Major deployments at Cloudflare, Uber, ByteDance, and Yandex run ClickHouse at the multi-petabyte scale. It is not a replacement for Postgres / MySQL (no row-level transactions, no point updates) or for Snowflake (slower at heavy multi-join batch). It sits between the stream and the dashboard as the sub-second serving tier.
What is the difference between MergeTree and ReplacingMergeTree?
MergeTree is the base columnar engine — it writes immutable on-disk parts and merges them in the background according to the ORDER BY key. ReplacingMergeTree adds a dedup semantic to the merge: when two rows share the same ORDER BY key, the merge keeps only one of them (the one with the greatest value in an optional version column, otherwise an arbitrary one). Use MergeTree for append-only event streams where every row is unique; use ReplacingMergeTree for CDC sinks where you want "the latest version of every row by primary key." Note that between merges, both versions may exist on disk — production queries pair ReplacingMergeTree with argMax(col, version) GROUP BY pk or with SELECT ... FINAL for the dedup at read time.
How do materialized views work in ClickHouse?
ClickHouse materialized views are insert-time triggers, not refresh-on-schedule snapshots. When you create a materialized view with CREATE MATERIALIZED VIEW mv TO target AS SELECT ... FROM source, the engine fires the SELECT over each insert batch into the source table and writes the result into the target table. There is no schedule, no cron, no full-table refresh. For real-time roll-ups, the target is typically an AggregatingMergeTree that stores partial aggregate states (countState, uniqState, sumState), and the dashboard reads with the matching *Merge functions to finalize the states. The "Refreshable Materialized View" feature added in 2024 is a separate construct that does run on a schedule — but in interviews "materialized view" almost always refers to the insert-time trigger variant.
How does ClickHouse handle updates and deletes?
ClickHouse does not have OLTP-style row updates. The closest equivalents are (1) ALTER TABLE ... UPDATE / DELETE mutations, which rewrite entire affected on-disk parts in the background — fine at low volume but unsuitable for high-frequency point updates; (2) ReplacingMergeTree with a version column, which lets the writer emit a new row per version and the merge dedupes at the sort-key level; (3) CollapsingMergeTree, which collapses paired +1 / -1 rows during merge; and (4) ALTER TABLE ... DROP PARTITION, which is the cheapest way to delete a coarse range (e.g. GDPR-driven cohort deletion at month granularity). If the workload demands frequent point updates, you are using the wrong tool — Postgres or a key-value store is the right answer, and ClickHouse becomes the analytical mirror downstream via CDC.
ClickHouse vs Snowflake — which one for real-time analytics?
For interactive sub-second dashboards over append-heavy data, ClickHouse is the strong default — its columnar storage, vectorised execution, and materialized-view roll-ups land query latencies in the 50–500ms range that Snowflake's compute-warehouse model cannot match without aggressive caching. For batch ELT, long-tail analytics, complex multi-join workloads, and ad-hoc SQL across many domains, Snowflake is the strong default — its separation of storage and compute, mature dbt integration, and seconds-to-minutes latency budget fit the batch pattern. The most common production deployment is both: ClickHouse for the real-time speed lane (kept hot for 30–90 days), Snowflake for the batch warehouse (kept for 5+ years). Pick the one that matches the latency contract, not the cost contract.
Do I need ZooKeeper to run ClickHouse?
For single-node ClickHouse (development, ETL scratch space, small dashboards) — no. For any production cluster with Replicated*MergeTree tables — yes, you need either ZooKeeper or ClickHouse Keeper as the coordination service. ClickHouse Keeper is a Raft-based, C++-implemented drop-in replacement that ships with ClickHouse and is the recommended choice for new clusters since 2023; it can be deployed standalone or co-located on ClickHouse nodes. ZooKeeper remains supported and is the right choice if your organisation already operates a ZooKeeper ensemble. Either way, the coordination service sequences replication log entries, DDL queries, and distributed leadership — without it, replication, ON CLUSTER DDL, and distributed mutations cannot function.
Practice on PipeCode
- Drill the real-time analytics practice library → for the dashboard-latency and roll-up family of problems.
- Rehearse on aggregation problems → when the interviewer wants
GROUP BYwith multiple aggregates. - Sharpen the time-axis with time-series practice drills → for
toStartOfHour/toStartOfDayand the partition-pruning patterns. - Layer the data aggregation library → for materialized-view-style roll-up problems.
- Stack the streaming pipeline library → for Kafka → sink contract questions.
- For the broader surface, read top data engineering interview questions →.
- Stack the prerequisites with the only 5 skills you need to become a data engineer →.
- Sharpen the SQL axis with the SQL for data engineering interviews course →.
- For long-form schema craft, work through data modelling for DE interviews →.
- For the ELT system-design axis, study the ETL system design course →.
Pipecode.ai is Leetcode for Data Engineering — every ClickHouse recipe above ships with hands-on practice rooms where you write the MergeTree table definition, the AggregatingMergeTree roll-up MV, and the Distributed-table sharding key against graded inputs that mirror real production schemas. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to wonder whether your `cityHash64(user_id)` choice actually balances the shards or whether your `uniqState` / `uniqMerge` pairing returns the correct DAU.





Top comments (0)