apache flink interview questions dominate the senior streaming round whenever event-time semantics, large state, or low-latency stream processing comes up. Interviewers don't stop at "what is Flink?" — they probe whether you understand flink watermarks as the event-time progress contract, flink keyed state as the per-key memory model, flink checkpointing as the durability mechanism, and flink exactly once as the composition of barriers plus two-phase commit.
This guide walks through the seven Flink primitives that show up most often in data engineering interview questions at FAANG and streaming-heavy shops (Uber, Netflix, LinkedIn, Stripe, Databricks). Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works. By the end you'll be able to defend BoundedOutOfOrdernessWatermarks, walk through a hopping window with allowed lateness, pick rocksdb over hashmap for TB-scale state, and explain how the JobManager-injected barrier + TwoPhaseCommitSinkFunction give end-to-end exactly-once — the exact shape flink data engineer interview rounds reward when flink vs spark streaming and flink cdc come up.
When you want hands-on reps immediately after reading, drill the streaming practice library →, browse real-time analytics drills →, and rehearse Python streaming problems →.
On this page
- Why Flink shows up in every senior data engineering interview
- The DataStream API and the streaming dataflow graph
- Event time, processing time, and watermarks
- Windows — tumbling, sliding, session, global; allowed lateness
- Keyed state, operator state, and the state backend
- Checkpointing, savepoints, and exactly-once via two-phase commit
- Flink SQL, Table API, and Flink CDC
- Choosing the right Flink primitive (cheat sheet)
- Frequently asked questions
- Practice on PipeCode
1. Why Flink shows up in every senior data engineering interview
Flink is the stateful event-time streaming engine the interview reaches for whenever Kafka Streams runs out
The one-sentence invariant: Apache Flink is a distributed, stateful, event-time stream processing engine where every record flows through a directed dataflow graph of operators, state is sharded by key across the cluster, and exactly-once is delivered via checkpoint barriers plus two-phase commit sinks. Once you internalise that — dataflow graph, keyed state, checkpoint barriers, transactional sinks — every apache flink interview questions prompt reduces to "which operator, which window, which state backend."
Where Flink wins vs Kafka Streams vs Spark Structured Streaming.
- Event time + watermarks — Flink has the richest event-time model; Kafka Streams handles it but with fewer knobs; Spark Structured Streaming reaches it but with more boilerplate.
- State at scale — Flink with RocksDB handles TB-scale keyed state; Kafka Streams tops out at ~tens of GB per instance; Spark stores state in checkpoints with higher overhead.
- Low-latency — Flink is single-record streaming (no micro-batches); Spark Structured Streaming is micro-batch by default (Continuous Mode is still experimental).
- Exactly-once across multiple sinks — Flink's TwoPhaseCommitSinkFunction handles Kafka, Iceberg, JDBC; Kafka Streams only gets EOS for Kafka-to-Kafka.
Where Flink LOSES vs the alternatives.
- Operational complexity — JobManager + TaskManagers + state backend + checkpoints + savepoints. More moving parts than Kafka Streams.
- JVM lock-in — DataStream API is Java/Scala first; PyFlink exists but is younger.
- Smaller community — Spark is bigger; Kafka Streams has Confluent's marketing weight.
The four primitives every Flink interview opens with.
- DataStream. The base streaming abstraction. Records flow through operators.
- Operator. A node in the dataflow graph — source, map, filter, keyBy, window, reduce, sink.
- State. Per-key (keyed state) or per-operator (operator state) memory the framework persists.
- Checkpoint. A consistent snapshot of all state across the cluster, taken via barrier injection.
What interviewers listen for.
- Do you reach for
keyBy()before any stateful operator? — required. - Do you mention
BoundedOutOfOrdernessWatermarkswhen event time comes up? — current-default signal. - Do you bring up
TwoPhaseCommitSinkFunctionwhen "end-to-end exactly-once" is discussed? — senior signal. - Do you reach for RocksDB when state size grows large? — production-experience signal.
Worked example — design a Flink job for clickstream sessionisation
Detailed explanation. A common opener: "Design a Flink job that reads clickstream events from Kafka, sessionises them per user (30-minute inactivity gap), and writes session summaries to Kafka exactly-once." The senior answer is a four-operator DataStream pipeline with event-time watermarks, session windows, RocksDB state, and a transactional Kafka sink.
Question. Sketch the operator chain and justify each component.
Code (DataStream API, Java).
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // checkpoint every 60s
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // RocksDB + incremental
DataStream<ClickEvent> clicks = env
.fromSource(
KafkaSource.<ClickEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("clicks")
.setValueOnlyDeserializer(new ClickDeserializer())
.build(),
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTimeMs),
"kafka-clicks");
clicks
.keyBy(ClickEvent::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionSummaryAggregator())
.sinkTo(
KafkaSink.<SessionSummary>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-clickstream-")
.build());
env.execute("clickstream-sessionisation");
Step-by-step explanation.
- Checkpointing enabled every 60s in EXACTLY_ONCE mode → the JobManager injects barriers and operators snapshot their state at each barrier.
- RocksDB state backend with incremental checkpoints → state can grow to TB scale; only changed bytes are uploaded each checkpoint.
-
KafkaSource with bounded out-of-orderness watermark strategy (5s) → event-time watermarks advance as
max(event_ts) - 5s. -
keyBy(userId)→ state and event delivery are sharded by user id; only one TaskManager handles a given user at a time. -
EventTimeSessionWindows(30 min gap)→ records for one user are grouped into sessions; the window fires when the gap exceeds 30 minutes (in event time). -
KafkaSinkwith EXACTLY_ONCE delivery → uses TwoPhaseCommitSinkFunction; pre-commits Kafka transaction at each checkpoint barrier, commits on global ack.
Output.
| Operator | Purpose | State |
|---|---|---|
| KafkaSource | read events + assign timestamps + emit watermarks | none |
| keyBy | shard by userId | none |
| SessionWindow | group by user + 30-min gap | RocksDB (one MapState per key) |
| aggregate | compute session summary | RocksDB (accumulator per window) |
| KafkaSink (EOS) | write summary + transaction state | small (transactional.id epoch) |
Rule of thumb. Watermarks → keyBy → window → aggregate → transactional sink. That's the canonical event-time streaming pattern; understanding why each step is necessary is what senior interviewers probe.
Flink interview question on choosing between Flink, Kafka Streams, Spark Streaming
A common opener: "Streams or Flink or Spark?"
Solution Using a workload-shape decision matrix
| Concern | Kafka Streams | Spark Structured Streaming | Apache Flink |
|---|---|---|---|
| Latency | < 100 ms | seconds (micro-batch) | < 100 ms |
| State scale | GB | GB-TB (with RocksDB) | GB-TB (with RocksDB) |
| Event time + watermarks | yes | yes | richest model |
| Exactly-once to non-Kafka sinks | weak | yes (idempotent writes) | strong (2PC) |
| Deployment | embedded in app | YARN/K8s cluster | JobManager + TaskManagers |
| SQL surface | ksqlDB | Spark SQL | Flink SQL + Table API |
| Language | Java/Scala | Scala/Python/Java | Java/Scala/Python(SQL) |
| Community size | small | largest | large |
Step-by-step trace.
| Workload | Best fit |
|---|---|
| Topic → Topic transformation, GB state, JVM team | Kafka Streams |
| Batch + streaming on same engine, micro-batch OK | Spark Structured Streaming |
| Event-time-heavy, TB state, exactly-once to non-Kafka sinks | Flink |
| Lakehouse with Iceberg/Hudi as sink | Spark or Flink, depending on existing stack |
| Low-latency CDC fanout | Flink with Flink CDC |
Output:
| Choice | When |
|---|---|
| Kafka Streams | smallish state, JVM app embedded, Kafka in / Kafka out |
| Spark Structured Streaming | unified batch+streaming team, micro-batch acceptable, big state |
| Flink | event-time-heavy, low-latency, big state, multi-sink EOS |
Why this works — concept by concept:
- Latency vs micro-batch — Flink is single-record; Spark waits per micro-batch. For sub-second latency Flink wins.
- State at TB scale — RocksDB-backed state in Flink supports TB-scale keyed state; Kafka Streams effectively caps at GB.
- Event-time semantics — Flink's watermark/allowed-lateness/idle-source machinery is the most expressive of the three.
- Two-phase commit sinks — only Flink supports end-to-end EOS to non-Kafka sinks via TwoPhaseCommitSinkFunction.
- Operational cost — Flink has the most moving parts; the productivity wins are concentrated at "low-latency + big state + EOS" workloads.
Streaming
Topic — streaming pipelines
Stream-engine selection problems
2. The DataStream API and the streaming dataflow graph
flink datastream api is "operators connected by streams" — explicit graph, explicit parallelism
The mental model: a Flink job is a DAG of operators; each operator can be parallelised across slots; records flow through operators along directed edges and state lives on whichever subtask owns the key.
Operators every Flink interview covers.
- Source. Reads from Kafka, Kinesis, files, sockets, JDBC.
- map / flatMap / filter. Stateless per-record transformations.
- keyBy. Re-shuffles records so all records for one key end up on one subtask.
- window. Groups records by time / count / session into finite windows.
- reduce / aggregate. Stateful per-window or per-key aggregations.
- connect / coMap / coFlatMap. Joins two streams with different types.
- process. Low-level operator with timer access and direct state access.
- sink. Writes to Kafka, S3, JDBC, Iceberg, Elasticsearch.
Parallelism.
- Each operator has a parallelism (number of parallel subtasks).
- Set via
setParallelism(N)per operator, or globally viaenv.setParallelism(N). -
keyByshuffles by key hash; afterkeyByrecords for one key always go to the same subtask.
Operator chaining.
- Adjacent operators with the same parallelism and forward connections are chained into one task by default.
- Chained operators run in the same thread → faster (no buffer copy / serialisation).
- Break the chain with
disableChaining()orstartNewChain()when you need backpressure isolation or per-operator metrics.
Backpressure.
- Slow downstream operators cause upstream buffers to fill; the source slows down.
- Visible in the Flink UI as "backpressure: HIGH" per operator.
- Diagnose by finding the slowest operator in the chain (usually a sink or a hot keyed state).
Worked example — write a windowed word count
Detailed explanation. The canonical Flink "hello world" — sessionised word count from a socket source.
Question. Build a DataStream pipeline that reads lines from a Kafka topic, tokenises into words, keys by word, and emits a count every 5 seconds (tumbling event-time window).
Code (Java).
DataStream<String> text = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("text")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build(),
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((line, ts) -> System.currentTimeMillis()),
"text-source");
DataStream<Tuple2<String, Long>> counts = text
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.toLowerCase().split("\\s+")) {
if (!word.isEmpty()) out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
counts.print();
env.execute("word-count");
Step-by-step explanation.
-
Source reads lines from Kafka topic
textand assigns a processing-time timestamp. -
flatMap tokenises each line into
(word, 1)tuples. - keyBy(word) shuffles tuples so all occurrences of the same word land on the same subtask.
- Tumbling 5-second event-time window groups tuples for each word into 5-second buckets.
-
.sum(1)sums the second field (the count) per word per window. - The window fires when the watermark crosses the end of the window — i.e., 5s of event-time has elapsed.
Output (for a stream "the cat sat on the mat" repeated once).
| word | window | count |
|---|---|---|
| the | [00:00:00 — 00:00:05) | 2 |
| cat | [00:00:00 — 00:00:05) | 1 |
| sat | [00:00:00 — 00:00:05) | 1 |
| on | [00:00:00 — 00:00:05) | 1 |
| mat | [00:00:00 — 00:00:05) | 1 |
Rule of thumb. keyBy ALWAYS precedes any stateful operator. Without keyBy, state is per-subtask (operator state), not per-key (keyed state); you almost certainly want keyed state.
Flink interview question on operator chaining
The probe: "What is operator chaining and when would you disable it?"
Solution Using disableChaining() for backpressure isolation
DataStream<Event> events = env.fromSource(...)
.name("source");
events
.map(new ExpensiveTransform())
.name("expensive")
.disableChaining() // break the chain
.keyBy(Event::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new SessionReducer())
.name("session-reducer")
.sinkTo(snowflakeSink);
Step-by-step trace.
| Configuration | Source | expensive | window | sink |
|---|---|---|---|---|
| Default chaining | chained into one task | (chained with source) | separate task | separate task |
| disableChaining on expensive | source standalone | expensive standalone | window standalone | sink standalone |
Output:
| Scenario | Default chaining | disableChaining |
|---|---|---|
| Wall-clock latency | lowest (no buffer copy) | slightly higher |
| Backpressure visibility | one number for the whole chain | per-operator |
| Hot-spot diagnosis | hard | easy |
| Threads per slot | 1 | 4 |
Why this works — concept by concept:
- Chaining = same JVM thread — chained operators pass records by reference, no serialisation, no buffer pool.
- Chained operators share backpressure — you can't distinguish which one is slow.
- Disable chaining to diagnose — when you need per-operator backpressure or metrics, force a thread boundary.
- Disable chaining for isolation — a slow downstream operator can stop pressuring an upstream by buffering at the chain boundary.
- Cost — extra threads, extra serialisation. Default chaining is right 90% of the time.
Streaming
Topic — streaming (Python)
Streaming dataflow design problems
3. Event time, processing time, and watermarks
flink watermarks are how Flink tells operators "event-time has progressed to here" under out-of-order events
![Diagram of Flink watermark progression across an event-time stream — a horizontal timeline with events arriving at processing time (t=10, t=12, t=11, t=15, t=14), event timestamps shown on each event, and a watermark line WM = max(event_ts) - allowed_lateness moving rightward; a tumbling window 0,10) is marked with a status banner showing 'fires when WM >= 10'; a late event after the watermark advanced past 10 is shown dropping or going to side-output; on a light PipeCode card.
The mental model: a watermark is a record flowing through the dataflow alongside data records, asserting "event-time has progressed past this point"; operators use it to decide when to fire windows and when to drop late records.
Time domains.
- Processing time — wall-clock on the operator. Fast, deterministic-on-replay false; useful when latency matters more than correctness.
- Event time — timestamp embedded in the record. The right answer for analytics, billing, fraud, A/B tests.
- Ingestion time — timestamp assigned at the source. Compromise; rarely the right choice.
Watermark generation strategies.
-
BoundedOutOfOrdernessWatermarks.forBoundedOutOfOrderness(Duration.ofSeconds(N))— most common; watermark =max(seen_event_ts) - N. -
AscendingTimestampsWatermarks—max(seen_event_ts); for guaranteed-in-order streams (rare). - Custom — for sources with known lateness distributions.
Allowed lateness.
-
window.allowedLateness(Duration.ofMinutes(M))— keep the window open forMafter the watermark passes the end. - Late records (those that arrive after
WM > window_end + allowedLateness) are dropped or sent to a side output for separate handling.
Idle sources.
- If a parallel source partition stops producing, watermarks don't advance for that partition, which holds back the global watermark.
-
WatermarkStrategy.withIdleness(Duration.ofMinutes(M))marks an idle subtask, advancing the global watermark from the active subtasks only.
Worked example — watermark + late-event handling
Detailed explanation. Walk through a tumbling 10-second window with 5-second out-of-orderness and 30-second allowed lateness — show how a late event is routed to a side output.
Question. Trace events with event_ts = 4, 8, 6, 12, 10 through the watermark + window machinery.
Input (events arriving in processing order).
| Event | event_ts (s) | processing_ts (s) |
|---|---|---|
| A | 4 | 10 |
| B | 8 | 11 |
| C | 6 | 12 |
| D | 12 | 13 |
| E | 10 | 14 |
Code.
DataStream<Event> events = env.fromSource(
source,
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTimeMs),
"events");
OutputTag<Event> lateTag = new OutputTag<>("late", TypeInformation.of(Event.class));
SingleOutputStreamOperator<Long> counts = events
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Duration.ofSeconds(30))
.sideOutputLateData(lateTag)
.process(new CountWindowFn());
DataStream<Event> lateRecords = counts.getSideOutput(lateTag);
lateRecords.sinkTo(lateRecordsSink);
Step-by-step explanation.
-
Event A (ts=4) → watermark becomes
max(4) - 5 = -1(clamped 0). Window [0,10) is open. -
Event B (ts=8) → watermark =
max(8) - 5 = 3. Still open. -
Event C (ts=6) → watermark =
max(8) - 5 = 3(max didn't change). Still open. Out-of-order but within the 5s tolerance. -
Event D (ts=12) → watermark =
max(12) - 5 = 7. Still open. (Note watermark is now > 5, but window end is 10, so window not fired yet.) -
Event E (ts=10) arrives. Watermark from D was 7, but event D itself (ts=12) is in the window [10,20). E's ts=10 is right at the boundary.
- Actually wait: E ts=10 is on the boundary. Tumbling [0,10) means [0,10) exclusive of 10. So E goes into [10,20).
- Suppose the watermark advances past 10 (say a later event sets max_ts=16 → WM=11). Window [0,10) fires now with events A, B, C.
- If allowed lateness expires (WM > 10 + 30 = 40), then a late event with ts=5 would be routed to
lateRecordsside output, not dropped silently.
Output (windowed counts, assuming all four events share the same key).
| Window | Events included | Count |
|---|---|---|
| [0, 10) | A (ts=4), B (ts=8), C (ts=6) | 3 |
| [10, 20) | D (ts=12), E (ts=10) | 2 |
Rule of thumb. BoundedOutOfOrdernessWatermarks with a tolerance of ~2× expected lateness + allowedLateness for outlier handling + side output for late records you still want to count.
Flink interview question on idle-source watermark advancement
The probe: "Our pipeline has 6 Kafka partitions but one partition rarely sees data. Windows never fire. Why?"
Solution Using WatermarkStrategy.withIdleness(Duration)
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTimeMs)
.withIdleness(Duration.ofMinutes(2)); // mark idle after 2 min
Step-by-step trace.
| Partition | Active? | Last event_ts | Watermark contribution |
|---|---|---|---|
| P0 | yes | 100 | 95 |
| P1 | yes | 102 | 97 |
| P2 | yes | 101 | 96 |
| P3 | yes | 100 | 95 |
| P4 | yes | 103 | 98 |
| P5 | idle 3 min | 50 | ignored (idleness) |
Output:
| Without idleness | With idleness (2 min) |
|---|---|
| Global WM = min(95,97,96,95,98,50) = 50 → windows stuck | Global WM = min(95,97,96,95,98) = 95 → windows fire |
Why this works — concept by concept:
- Global watermark = min across all parallel sources — one slow partition stalls everyone.
- Idleness flag — Flink temporarily ignores a subtask's WM contribution once it hasn't produced in the configured duration.
- When the idle source produces again — Flink re-includes its WM contribution; if it's behind, watermarks pause briefly while it catches up.
-
Reasonable default — set
withIdlenessto a value larger than your typical low-volume gap but smaller than your largest acceptable window-firing delay. - Cost — none; small in-memory bookkeeping per source subtask.
Streaming
Topic — real-time analytics
Watermark + event-time problems
4. Windows — tumbling, sliding, session, global; allowed lateness
flink windows are how you turn an infinite stream into finite chunks; the right window depends on the report
The mental model: a window assigner decides which window(s) each record belongs to; a trigger decides when each window fires; an evictor (optional) decides which records to discard before firing.
Window types.
- Tumbling — fixed-size, non-overlapping (e.g. every 5 minutes). Most common.
- Sliding (hopping) — fixed-size, with a slide step smaller than the size (e.g. 5-min window every 1 min). More CPU because each record lands in multiple windows.
- Session — variable-size, separated by inactivity gap. Per-user sessionisation pattern.
- Global — one window covering the entire stream, fires on a custom trigger.
Triggers.
- Default: window fires when watermark crosses the window end.
-
CountTrigger: fires when N elements arrive (count windows). -
EventTimeTrigger.continuousAt(...): fires every N ms. -
Custom: override
onElement,onEventTime,onProcessingTimefor advanced logic.
Window functions.
-
reduce(fn)— incremental aggregation, low memory. -
aggregate(accFn)— same idea, more general (acc + result types differ). -
process(processFn)— full window contents available, can emit any output. Higher memory.
Worked example — sliding 5-min window every 1 min for rolling counts
Detailed explanation. Rolling counts of clickstream events per user over the last 5 minutes, refreshed every minute.
Question. Build a hopping-window aggregator.
Code.
DataStream<ClickEvent> clicks = ...;
DataStream<UserCount> rolling = clicks
.keyBy(ClickEvent::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new CountAgg(), new EmitWithWindowInfo());
Step-by-step explanation.
-
SlidingEventTimeWindows.of(5min, 1min)— each record belongs to up to 5 windows (size / slide = 5). - For each (user, window) the
CountAggincrements a counter. - When the watermark crosses the window's end,
EmitWithWindowInfoemits one record per (user, window). - CPU cost: each record updates 5 windows; memory cost: O(active_windows × active_users).
Output (one user firing several windows).
| user_id | window_start | window_end | count |
|---|---|---|---|
| u_42 | 12:00 | 12:05 | 17 |
| u_42 | 12:01 | 12:06 | 19 |
| u_42 | 12:02 | 12:07 | 22 |
| u_42 | 12:03 | 12:08 | 25 |
Rule of thumb. Sliding windows are useful for smooth time-series dashboards; tumbling windows are simpler and cheaper for billing / per-period reports.
Flink interview question on choosing the window type
The probe: "User-session analytics — tumbling or session windows?"
Solution Using EventTimeSessionWindows.withGap
DataStream<SessionSummary> sessions = clicks
.keyBy(ClickEvent::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionSummaryAgg());
Step-by-step trace.
| Event for user u_42 | event_ts (s) | session state |
|---|---|---|
| click "/home" | 1000 | open session: [1000, ?] |
| click "/cart" | 1300 | within 30-min gap → still in [1000, ?] |
| click "/checkout" | 2400 | within gap → still in [1000, ?] |
| (silence for 35 min) | — | watermark passes 2400 + 30m → session [1000, 4200] fires |
| click "/home" | 4400 | new session: [4400, ?] |
Output:
| user_id | session_start | session_end | events | session_summary |
|---|---|---|---|---|
| u_42 | 1000 | 4200 | 3 | (cart added, checkout completed) |
| u_42 | 4400 | (open) | 1 | (started new session) |
Why this works — concept by concept:
- Session windows = variable size — they grow as new events arrive within the gap.
- Window end is dynamic — only known after the watermark exceeds last_event + gap.
-
Late events extend or split sessions — depending on
allowedLateness. - Cost — state per active session per user; bounded by gap × active users.
- Alternative — tumbling 30-min windows lose session semantics; not the right answer when "session" is the unit of analysis.
Streaming
Topic — real-time analytics
Window + sessionisation problems
5. Keyed state, operator state, and the state backend
flink keyed state is per-key memory; flink state backend decides where it lives
State primitives.
-
ValueState<T>— single value per key. -
ListState<T>— list of values per key. -
MapState<K, V>— map per key. -
ReducingState<T>/AggregatingState<IN, OUT>— pre-aggregated per key.
Keyed state vs operator state.
-
Keyed state — accessible only inside operators that follow a
keyBy. State for one key lives on exactly one subtask. - Operator state — per-subtask state (e.g. Kafka source offsets). No key partitioning.
State backends.
-
HashMapStateBackend— state in JVM heap. Fastest access (sub-microsecond). GB scale. OOM risk at TB scale. -
EmbeddedRocksDBStateBackend— state on local SSD via RocksDB. TB scale. Slower (~10× heap latency). Supports incremental checkpoints. -
MemoryStateBackend— deprecated; only for testing.
TTL (time-to-live) on state.
-
StateTtlConfig— automatically expire state entries after a TTL. - Required when you have unbounded key spaces (e.g. user_id from a registration stream).
Checkpoint storage.
- JobManager — heap-only; for tiny state, testing.
- FileSystem (S3, HDFS, ABFS) — production default; backends snapshot state to remote storage on checkpoint.
Worked example — keyed state for de-duplication
Detailed explanation. A common interview problem: "How would you de-dup a stream of events by event_id, where the same event might arrive multiple times within 24 hours?" The senior answer is keyed state + TTL.
Question. Implement a Flink operator that drops duplicates within 24 hours.
Code.
public class DedupFn extends KeyedProcessFunction<String, Event, Event> {
private transient ValueState<Boolean> seen;
@Override
public void open(Configuration cfg) {
StateTtlConfig ttl = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Boolean> d =
new ValueStateDescriptor<>("seen", Types.BOOLEAN);
d.enableTimeToLive(ttl);
seen = getRuntimeContext().getState(d);
}
@Override
public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
if (seen.value() == null) {
seen.update(true);
out.collect(e);
}
// else drop duplicate
}
}
DataStream<Event> deduped = events
.keyBy(Event::getEventId)
.process(new DedupFn());
Step-by-step explanation.
-
keyBy(eventId)routes every event with the same id to the same subtask. -
ValueState<Boolean>stores whether we've seen this event id before. - TTL of 24 hours ensures the state entry is automatically purged after 24 hours, preventing unbounded state growth.
-
First arrival of an event —
seen.value() == null→ set to true and emit. -
Subsequent arrivals within 24h —
seen.value()returns true → drop. - After 24h — TTL expires the state entry; if the event re-arrives, it's treated as new.
Output (de-dup behaviour).
| Event id | event_ts (h) | seen state | emitted? |
|---|---|---|---|
| e_001 | 0 | null → true | yes |
| e_001 | 2 | true | no (dropped) |
| e_001 | 23 | true | no |
| e_001 | 25 | (TTL expired) null → true | yes (re-emitted) |
| e_002 | 0 | null → true | yes |
Rule of thumb. Whenever you reach for keyed state, also reach for TTL. Unbounded key spaces eat state backends alive.
Flink interview question on choosing the state backend
The probe: "Should I use HashMap or RocksDB state backend?"
Solution Using a state-size + latency decision table
| Concern | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| State size cap | GB (heap-limited) | TB (SSD-limited) |
| Per-access latency | sub-microsecond | ~10× slower |
| Checkpoint mode | full | incremental (default) |
| GC pressure | high at high state | none (off-heap) |
| Recovery time | depends on full checkpoint | faster (incremental) |
| Production default | small jobs | most jobs |
Step-by-step trace.
| State size | Throughput target | Pick |
|---|---|---|
| < 1 GB per TaskManager | sub-ms latency | HashMap |
| GB scale, throughput-sensitive | balanced | benchmark both |
| 10s of GB+ | any | RocksDB |
| > 100 GB | any | RocksDB (no other option) |
Output:
| Workload | Backend |
|---|---|
| Word count, small key cardinality | HashMap |
| User sessions, millions of active users | RocksDB |
| Rolling aggregates over years of data | RocksDB |
| Sub-ms-critical ad-bidding | HashMap (if state fits) |
Why this works — concept by concept:
- Heap state — lives in JVM heap; sub-microsecond access; bounded by heap size and pressured by GC at scale.
- RocksDB state — lives on local SSD as LSM-tree; bounded only by disk; ~10× slower per access; supports incremental checkpoints.
- Incremental checkpoints — only the changed RocksDB SSTable files are uploaded; massive savings for large state.
- Off-heap memory — RocksDB uses native memory, not JVM heap; no GC pressure from state size.
- Cost — RocksDB pays ~10× per-access latency for 100× state capacity. Worth it for any production-scale job.
Streaming
Topic — streaming pipelines
Keyed state + TTL problems
6. Checkpointing, savepoints, and exactly-once via two-phase commit
flink checkpointing is the durability story; flink exactly once is barriers + two-phase commit sinks
The mental model: checkpoints take a consistent snapshot of state across the cluster by injecting barriers into the dataflow; operators snapshot their state when the barrier passes; sinks use those barriers to drive two-phase commit.
Checkpoint mechanics.
- JobManager periodically injects a barrier record into every source.
- The barrier flows through the dataflow alongside data records.
- When an operator receives the barrier on all input channels, it snapshots its state and forwards the barrier downstream.
- When the barrier reaches every sink, the JobManager declares the checkpoint complete.
- State snapshots are uploaded to checkpoint storage (S3 / HDFS / ABFS).
Two-phase commit sinks (TwoPhaseCommitSinkFunction).
-
beginTransaction()— sink starts a transaction with the external system (Kafka, JDBC, Iceberg). -
invoke()— records are written into the open transaction. -
preCommit()— fired when the barrier reaches the sink. Pre-commit the transaction (Kafka:producer.flush()+ retain the transaction id). -
commit()— fired when the JobManager declares the global checkpoint complete. The sink finalises the transaction. -
abort()— fired on failure; the sink rolls back the transaction.
Savepoints vs checkpoints.
- Checkpoint — automatic, periodic, primarily for failure recovery.
- Savepoint — manually triggered, used for job upgrades, A/B testing, parallelism changes, version migrations.
Recovery.
- On failure, the JobManager restarts the job from the last completed checkpoint.
- All operators rebuild their state from the snapshot; sources rewind to their checkpointed offsets.
Worked example — end-to-end exactly-once Kafka → Flink → Kafka
Detailed explanation. Walk a record through an EOS pipeline showing how the barrier drives the two-phase commit.
Question. Trace one record through KafkaSource → map → KafkaSink (EOS).
Code.
env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DataStream<String> in = env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("in")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build(),
WatermarkStrategy.noWatermarks(),
"in");
DataStream<String> mapped = in.map(s -> s.toUpperCase());
mapped.sinkTo(
KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("out")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-eos-job-")
.build());
env.execute("eos-pipeline");
Step-by-step trace.
| t | Event | Source | Map | Sink (txnal) |
|---|---|---|---|---|
| 0 | record R arrives | reads R, emits | upcases | beginTransaction → invoke(R-up) |
| 30s | barrier B1 injected | snapshot offset; forward B1 | snapshot state; forward B1 | barrier arrives → preCommit |
| 30.5s | barrier reaches all sinks | — | — | preCommit complete |
| 30.6s | JobManager declares checkpoint 1 complete | — | — | commit() — Kafka transaction finalised |
| 30.6s | downstream consumer with read_committed | — | — | sees R-up (now COMMITTED) |
Output:
| Concern | Without EOS | With EOS |
|---|---|---|
| Duplicate on restart | possible | impossible |
| Downstream consumer | sees in-flight records | only sees committed records |
| Latency added by 2PC | none | <100ms per checkpoint |
| Storage overhead | none | tiny — transaction state in Kafka |
Rule of thumb. EOS = checkpointing + TwoPhaseCommitSinkFunction + downstream consumer with isolation.level=read_committed. All three must be set; missing any one breaks the guarantee.
Flink interview question on savepoint-based job upgrades
The probe: "How do you upgrade a Flink job to a new version without losing state?"
Solution Using savepoint + new-job-from-savepoint
# 1. Trigger a savepoint and stop the running job
flink savepoint --stop <jobId> s3://flink-savepoints/
# 2. Deploy the new job version, starting from the savepoint
flink run -s s3://flink-savepoints/savepoint-<uuid> new-job.jar
Step-by-step trace.
| Step | Action | Effect |
|---|---|---|
| 1 |
savepoint --stop triggers a savepoint and stops the job cleanly |
all operators flush, snapshot state to S3 |
| 2 | The new JAR is deployed | new business logic, same state schema |
| 3 |
flink run -s restores the new job from the savepoint |
all operators rebuild state from S3 |
| 4 | Source rewinds to checkpointed offsets | no data is dropped or duplicated |
Output:
| Concern | Stop + restart | Savepoint-based upgrade |
|---|---|---|
| State preserved | yes (from last checkpoint) | yes (from explicit savepoint) |
| Mid-flight records | reprocessed | reprocessed exactly-once |
| Operator UID stability | required | required |
| Schema compatibility | optional | optional but recommended |
| Parallelism change | only at savepoint | yes — savepoint supports rescaling |
Why this works — concept by concept:
-
Savepoint = explicit, named, durable checkpoint — survives operator code changes if
setUidHash/uid()is stable. -
Operator UIDs — every stateful operator should have an explicit
.uid("..."); without it, savepoint restoration is brittle across code refactors. - Rescaling — savepoints support changing the job's parallelism on restart (rare for checkpoints).
- Schema evolution — type serializers can evolve as long as new types are backward-compatible with the savepoint format.
- Cost — one explicit savepoint write to durable storage per upgrade; trivial.
Streaming
Topic — streaming pipelines
Checkpoint + EOS + savepoint problems
7. Flink SQL, Table API, and Flink CDC
flink sql is the SQL surface over the DataStream API; Flink CDC connectors stream database changes natively
The mental model: Flink SQL and the Table API are higher-level abstractions over the same execution engine; they trade some flexibility for huge developer productivity wins, especially for declarative analytical workloads.
Flink SQL.
- ANSI-SQL-ish dialect with streaming-specific extensions (
MATCH_RECOGNIZE,TUMBLE,HOP,SESSIONwindow functions). - Stateful SQL —
GROUP BY user_idwith TTL stays in state. - Joins — regular, interval, temporal table.
- Examples:
-- Tumbling 5-min count per user
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
COUNT(*) AS clicks
FROM clicks
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
Table API.
- Java/Scala/Python DSL with the same semantics as Flink SQL.
- Mix freely with DataStream API via
tableEnv.fromDataStream(...)andtableEnv.toChangelogStream(...).
Flink CDC.
- Native connectors that turn database WAL/binlog into Flink DataStreams (PostgreSQL, MySQL, MongoDB, Oracle).
- Sources emit
+I,-U,+U,-Doperations (Debezium-style) directly into Flink without a Kafka hop. - Useful when CDC consumers are Flink jobs themselves.
Changelog vs append streams.
- Append-only stream — every record is new; no updates or deletes.
-
Changelog stream — records can be updates or deletes (the
+I/-U/+U/-Dsemantics). - Flink SQL handles both; the planner chooses operators based on the source's changelog mode.
Worked example — Flink SQL with a temporal join on a CDC source
Detailed explanation. Join a stream of orders against a slowly-changing customer table sourced via Flink CDC.
Question. Build the join in Flink SQL.
Code.
-- Customers as a changelog source (Flink CDC)
CREATE TABLE customers (
customer_id BIGINT,
name STRING,
city STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-primary',
'port' = '5432',
'username' = 'cdc_user',
'database-name' = 'production',
'schema-name' = 'public',
'table-name' = 'customers',
'decoding.plugin.name' = 'pgoutput'
);
-- Orders as an append-only stream
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(12,2),
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'json'
);
-- Temporal join — orders enriched with the customer's city AS OF event_time
SELECT
o.order_id,
o.amount,
c.city AS city_at_order_time
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.event_time AS c
ON o.customer_id = c.customer_id;
Step-by-step explanation.
- The
customerstable is a Flink-CDC source — it materialises the Postgrescustomerstable as a changelog stream inside Flink. - The
orderstable is an append-only Kafka stream with an event-time watermark. -
FOR SYSTEM_TIME AS OF o.event_time— temporal join: for each order, look up the customer's row as it was at the order's event time. - Flink maintains the customer table in keyed state (RocksDB), and the join is event-time-aware.
- End result: every order is enriched with the customer's city at the time the order happened.
Output.
| order_id | amount | city_at_order_time |
|---|---|---|
| 7001 | 99.00 | Austin |
| 7002 | 149.00 | Seattle |
Rule of thumb. Use Flink SQL whenever the logic is declarative and you don't need custom timers or low-level state access. Drop into the DataStream API for everything else.
Flink interview question on when to use Flink CDC vs Kafka Connect Debezium
The probe: "We're moving from Postgres to Snowflake. Flink CDC or Debezium → Kafka → Sink?"
Solution Using a workload-shape decision
| Scenario | Kafka Connect Debezium | Flink CDC |
|---|---|---|
| Multiple consumers downstream | yes — Kafka is the fan-out | one Flink job per consumer |
| Stream-processing transformations | extra Kafka hop | native — same job |
| Replay window | Kafka retention | re-snapshot or re-checkpoint |
| Operational components | Debezium + Kafka + sink | one Flink job |
| Schema evolution | Schema Registry | Flink type system |
Step-by-step trace.
| Use case | Pick |
|---|---|
| CDC fanout to 5 destinations | Debezium → Kafka |
| Single Flink job that does CDC + windowed aggregation + sink | Flink CDC |
| Existing Kafka-centric stack | Debezium |
| Sub-second latency, lowest hop count | Flink CDC |
Output:
| Concern | Kafka-centric | Flink-centric |
|---|---|---|
| Hops | 3 (DB → Kafka → Sink) | 2 (DB → Flink → Sink) |
| Components | Debezium + Kafka + Sink | Flink |
| Latency | ~1–2s | ~0.5s |
| Fanout | excellent | one job per consumer |
Why this works — concept by concept:
- Flink CDC eliminates the Kafka hop — fewer components, lower latency.
- Kafka-centric fanout — when multiple downstream consumers need the same CDC feed, Kafka is the right buffer.
- Flink's state machinery — gives you full event-time semantics over the CDC stream directly.
- Operational cost — Flink CDC = one job to monitor; Debezium = multiple components.
- Cost — pick based on the topology, not the technology hype.
Streaming
Topic — real-time analytics
Flink SQL + CDC problems
Choosing the right Flink primitive (cheat sheet)
-
Need stateful streaming with event-time semantics? Flink with
keyBy+ watermarks + RocksDB. -
Need TB-scale state?
EmbeddedRocksDBStateBackendwith incremental checkpoints. -
Need sub-microsecond state access?
HashMapStateBackend(only if state fits in heap). -
Need bounded out-of-order tolerance?
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(N)). -
Need to mark idle partitions?
.withIdleness(Duration.ofMinutes(M)). -
Need to handle late records?
.allowedLateness(Duration)+sideOutputLateData(tag). -
Need exactly-once to Kafka?
KafkaSinkwithDeliveryGuarantee.EXACTLY_ONCE+transactionalIdPrefix. -
Need exactly-once to any sink? Implement
TwoPhaseCommitSinkFunction. -
Need to upgrade a job without losing state?
savepoint --stopthenrun -sagainst the new JAR. -
Need declarative SQL over streams? Flink SQL with
TUMBLE / HOP / SESSIONwindow functions. - Need CDC from a DB into a Flink job? Flink CDC connector (skip the Kafka hop).
-
Need to drop duplicates? Keyed state with TTL on a
ValueState<Boolean>.
Frequently asked questions
What is Apache Flink and where does it win over Kafka Streams or Spark Structured Streaming?
Apache Flink is a distributed, stateful, event-time stream processing engine where records flow through a directed dataflow graph of operators and state is sharded by key across the cluster. It wins over Kafka Streams when state grows beyond ~GBs (Flink with RocksDB handles TB) or you need exactly-once to non-Kafka sinks (TwoPhaseCommitSinkFunction). It wins over Spark Structured Streaming on sub-second latency (Flink is single-record streaming, Spark is micro-batch by default) and richer event-time semantics. Spark wins for unified batch+streaming on one engine and broader community.
How do Flink watermarks work and what is bounded out-of-orderness?
A watermark is a record flowing through the dataflow asserting "event-time has progressed past this point" — operators use it to fire windows and identify late records. Bounded out-of-orderness is the most common strategy: WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(N)) sets the watermark to max(seen_event_ts) - N seconds. This tolerates up to N seconds of out-of-order arrival. Any record arriving after the watermark advances past its event_time is "late" and either dropped or routed to a side output via allowedLateness + sideOutputLateData.
What's the difference between HashMap and RocksDB state backends in Flink?
HashMapStateBackend keeps state in the JVM heap — sub-microsecond access, but bounded by heap size and pressured by GC at scale (typically caps around a few GB per TaskManager). EmbeddedRocksDBStateBackend keeps state on local SSD via RocksDB — ~10× slower per access but scales to TBs and supports incremental checkpoints (only changed SSTable files are uploaded). RocksDB is the production default for anything but the smallest jobs. Use HashMap only when state fits in heap and sub-ms latency is critical.
How does Flink achieve exactly-once semantics end-to-end?
End-to-end EOS in Flink is the composition of three things: (1) checkpointing in EXACTLY_ONCE mode — the JobManager injects barriers into the dataflow, operators snapshot state when the barrier passes, and the JobManager only declares the checkpoint complete when all operators acknowledge; (2) TwoPhaseCommitSinkFunction — the sink does beginTransaction → invoke → preCommit (on barrier) → commit (on global ack) / abort (on failure); (3) downstream consumers with isolation.level=read_committed — they only see records from committed transactions. Miss any one and the guarantee breaks.
What are checkpoints vs savepoints in Flink?
Checkpoints are automatic, periodic, durable snapshots of all state across the cluster — primarily for failure recovery. The job restarts from the last checkpoint after a crash. Savepoints are manually triggered, named checkpoints used for job upgrades, parallelism changes, A/B testing, and version migrations. Always set explicit .uid("...") on every stateful operator so savepoints can re-bind state across code refactors. Use flink savepoint --stop <jobId> s3://... to take a savepoint and stop the job; restart the new job version with flink run -s s3://.../savepoint-<uuid> new-job.jar.
When should I use Flink CDC vs Debezium + Kafka + Sink Connector?
Use Flink CDC when the entire pipeline is a single Flink job that consumes the database changelog and does its own stream-processing (windowed aggregations, joins, transformations) — fewer components, lower latency, no Kafka hop. Use Debezium + Kafka + Sink Connector when you need fan-out to multiple downstream destinations (5 different sinks all reading the same CDC feed) — Kafka becomes the durable buffer and the replay surface. The decision is about topology: single-job vs multi-consumer.
Practice on PipeCode
- Drill the streaming practice library → for end-to-end Flink pipeline questions.
- Rehearse real-time analytics drills → for windowed aggregations and CDC-driven analytics.
- Sharpen Python streaming problems → when the interviewer wants PyFlink code, not Java.
- For the broader interview surface, read top data engineering interview questions →.
- Stack the prerequisites with the only 5 skills you need to become a data engineer →.
- Reinforce the compute side with Apache Spark internals for DE interviews →.
- For the design-round muscles, work through ETL system design for DE interviews →.
- To pair Flink with table modelling, browse data modelling for DE interviews →.
Pipecode.ai is Leetcode for Data Engineering — every Flink concept above ships with hands-on practice rooms where you design watermarks, tune state backends, and trace two-phase commit. Start with the streaming library and work outward; PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine.




Top comments (0)