Stripe data engineering interview questions lean Python-heavy with a payments-platform edge: six Python primitives (sort + greedy for tiered shipping or fee schedules, hash-table aggregation for per-merchant transaction-fee rollups, set-based deduplication for idempotent event apply, collections.deque for bounded producer-consumer queues with back-pressure, event-time watermarks for late-record drop, and an end-to-end CSV ETL with validation and a logging summary) plus a single SQL primitive that tests JSONB introspection (top-level key extraction from event_payload with explicit type casting). The framings are payments-platform analytics—shipping fees, transaction fees, idempotency tokens, bounded ingestion buffers, event-time vs processing-time, ETL run logs, and JSONB event payloads.
This guide walks through the seven topic clusters Stripe actually tests, each with a detailed topic explanation, per-sub-topic explanation with a worked example and its solution, and an interview-style problem with a full solution that explains why it works. The mix matches the curated 7-problem Stripe set (1 easy, 5 medium, 1 hard)—a medium-heavy hub that rewards Python state-management fluency, streaming intuition, and one carefully scoped SQL JSONB query, rather than algorithm-puzzle Python.
Top Stripe data engineering interview topics
From the Stripe data engineering practice set, the seven numbered sections below follow this topic map (one row per H2):
| # | Topic (sections 1–7) | Why it shows up at Stripe |
|---|---|---|
| 1 | Python sorting and greedy for tiered pricing | Shipping Cost Calculator with Tiered Pricing—sort tiers once, walk weight greedily, sum per-tier slabs. |
| 2 | Python hash tables for transaction-fee aggregation | Transaction Fee Aggregation by Merchant—defaultdict(float) keyed by merchant_id, single-pass over rows. |
| 3 | Python sets for idempotent event apply | Idempotent Event Apply with Token Cache—seen-set short-circuit so replays don't double-charge. |
| 4 | Python deques for bounded producer-consumer queues | Bounded Producer-Consumer Queue—collections.deque(maxlen=N) with explicit back-pressure policy. |
| 5 | Python streaming with event-time watermarks | Drop Late Records Past Watermark—watermark = max(event_time) − allowed_lateness, drop on event_time < watermark. |
| 6 | Python end-to-end ETL with validation and logging | End-to-End ETL: Ingest, Validate, Load with Logging Summary—csv.DictReader → per-row validate → counters → final summary. |
| 7 | SQL JSONB filtering with type casting | Extract Top-Level Keys from event_payload JSONB—jsonb_object_keys, ->>, and explicit :: casts. |
Payments-platform framing rule: Stripe's prompts span payments-platform engineering—shipping fees, transaction fees, idempotency tokens, bounded ingestion buffers, event-time vs processing-time, ETL run logs, JSONB event payloads. The interviewer is grading whether you map each business framing to the right primitive: tiered fee schedule → sort + greedy walk; per-merchant rollup → hash-table aggregation; replay safety → seen-set + token cache; bounded buffer →
deque(maxlen=N)+ explicit overflow; late events → watermark filter; reliable ingest → ETL counters + summary log; flexible event payloads → JSONB introspection with explicit casts. State the mapping out loud.
1. Python Sorting and Greedy for Tiered Pricing
Sort-then-greedy walk for tiered pricing in Python for data engineering
Tiered pricing—shipping fees, payout fees, marketplace commissions—is the canonical "sort once, walk once" interview prompt. The mental model: sort the tiers by their lower bound, then walk an input weight (or amount) tier-by-tier and sum the per-tier slabs. The invariant is greedy: once you've consumed a tier's interval, you never come back to it. Three sub-skills carry the section: representing tiers as (threshold, rate) pairs, walking them in one pass, and being explicit about whether each threshold is inclusive or exclusive.
Pro tip: Watch units. A tiered shipping schedule mixes "first 1 kg = $5" with "each additional 0.5 kg = $0.50". Confusing flat-fee tiers with per-unit tiers is the #1 way candidates lose this question. Read the schedule out loud and underline whether each tier is flat-fee or per-unit.
Tier table: a list of (threshold, rate) intervals
A tier table is a sorted list of (upper_threshold, rate) pairs, optionally with a flat-fee column. The invariant: tier i covers the interval (threshold[i-1], threshold[i]]—the lower bound is the previous threshold, and the upper bound is threshold[i]. Sorted by upper_threshold ascending, you can walk weight greedily without scanning the entire table on every step.
-
tiers: list[tuple[float, float]]—[(threshold, rate), ...]sorted bythresholdascending. -
Lower bound is implicit — tier
i's lower bound istiers[i-1].threshold(or 0 for the first tier). -
Final tier is unbounded — represent it with
threshold = math.infso the walk terminates cleanly. - Worked example: shipping schedule with three tiers — up to 1 kg @ $5 flat, +up to 5 kg @ $1/kg, beyond @ $0.50/kg.
| tier | upper (kg) | shape | rate |
|---|---|---|---|
| 1 | 1.0 | flat | $5.00 |
| 2 | 5.0 | per-kg | $1.00/kg |
| 3 | inf | per-kg | $0.50/kg |
import math
# (upper_threshold_kg, shape, rate)
tiers = [
(1.0, "flat", 5.00),
(5.0, "per_kg", 1.00),
(math.inf, "per_kg", 0.50),
]
Sort once, walk once: greedy interval consumption
The walk is one pass: at each tier, consume min(remaining_weight, tier_width) of the input, charge it at the tier rate, decrement the remaining weight, advance. The invariant: each tier is visited at most once, and the loop terminates as soon as remaining_weight == 0.
-
tier_width = upper - prev_upper— slab width for that tier (usemath.infonly on the final tier). -
consumed = min(remaining, tier_width)— never overshoot the tier; saturate withmin. -
Flat tiers add the rate as-is; per-unit tiers multiply
consumed * rate. -
Loop guard:
if remaining <= 0: break—stops the walk early once the input is fully covered. - Worked example: ship a 4 kg package on the schedule above.
| step | tier | width (kg) | consumed (kg) | charge | remaining (kg) |
|---|---|---|---|---|---|
| 1 | 1 (flat) | 1.0 | 1.0 | $5.00 | 3.0 |
| 2 | 2 (per_kg) | 4.0 | 3.0 | $3.00 | 0.0 |
| 3 | (loop ends — remaining is 0) | — | — | — | — |
Total: $8.00.
def shipping_cost(weight_kg: float, tiers: list[tuple[float, str, float]]) -> float:
total, prev_upper, remaining = 0.0, 0.0, weight_kg
for upper, shape, rate in tiers:
if remaining <= 0:
break
width = upper - prev_upper
consumed = min(remaining, width)
if shape == "flat":
total += rate
else:
total += consumed * rate
remaining -= consumed
prev_upper = upper
return round(total, 2)
Boundary handling: inclusive vs. exclusive thresholds
The most common bug is at the tier boundary. A schedule that says "up to 1 kg costs $5" usually means weight <= 1.0 falls in the first tier; "more than 1 kg up to 5 kg" means the second tier starts at strictly above 1 kg. The invariant: be consistent—pick one convention and apply it everywhere.
-
Half-open
(prev, upper]— what the walk above implements;weight = 1.0lands in tier 1, not tier 2. -
Half-open
[prev, upper)— alternative;weight = 1.0would jump to tier 2's first kilogram. - Inclusive both sides — never use; ties become ambiguous.
-
Floating-point danger —
0.1 + 0.2 != 0.3; convert toDecimalor work in cents/grams for money/weight comparisons. -
Worked example: at exactly 1 kg with the half-open
(prev, upper]convention, the package is fully consumed in tier 1 (1.0 kg of 1.0 kg width), charged $5 flat, and the loop exits withremaining == 0.
from decimal import Decimal, getcontext
# When money is involved, switch to Decimal to avoid float drift.
getcontext().prec = 28
tiers_dec = [
(Decimal("1.0"), "flat", Decimal("5.00")),
(Decimal("5.0"), "per_kg", Decimal("1.00")),
(Decimal("Infinity"), "per_kg", Decimal("0.50")),
]
Common beginner mistakes
- Re-scanning the tier table for every kilogram of input—turns a single-pass O(K) walk into O(K·N).
-
Forgetting
min(remaining, tier_width)—charges the full tier width even when the input ends mid-tier. -
Not bounding the final tier with
math.inf—loop terminates before the heaviest packages are charged. -
Mixing flat-fee and per-unit tiers without a
shapefield—silently double-counts or skips charges. -
Comparing money or weight as
float—rounding bugs at exact boundaries; useDecimalor work in integer base units.
Python interview question on tiered shipping cost
Implement shipping_cost(weight_kg, tiers) for the schedule below:
- 0–1 kg: $5 flat.
- Over 1 kg, up to 5 kg: $1 per kg (charged on the portion in this tier only).
- Over 5 kg: $0.50 per kg (on the portion above 5 kg).
Return the total in dollars rounded to 2 decimals. Tiers may not be passed in sorted order—sort them first.
Solution Using sort plus a greedy single-pass walk
import math
from typing import List, Tuple
Tier = Tuple[float, str, float] # (upper_threshold, shape, rate)
def shipping_cost(weight_kg: float, tiers: List[Tier]) -> float:
tiers = sorted(tiers, key=lambda t: t[0])
total, prev_upper, remaining = 0.0, 0.0, weight_kg
for upper, shape, rate in tiers:
if remaining <= 0:
break
width = upper - prev_upper
consumed = min(remaining, width)
total += rate if shape == "flat" else consumed * rate
remaining -= consumed
prev_upper = upper
return round(total, 2)
Step-by-step trace (input: weight_kg = 7.5, tiers as in the question):
| weight_kg | tiers (after sort) |
|---|---|
| 7.5 | [(1.0, "flat", 5.00), (5.0, "per_kg", 1.00), (inf, "per_kg", 0.50)] |
- Sort tiers — already ascending by upper threshold; no reorder.
-
Initialize —
total = 0.0,prev_upper = 0.0,remaining = 7.5. -
Iteration 1 — tier
(1.0, "flat", 5.00)—width = 1.0 - 0.0 = 1.0;consumed = min(7.5, 1.0) = 1.0; flat shape, sototal += 5.00 → 5.00;remaining = 7.5 - 1.0 = 6.5;prev_upper = 1.0. -
Iteration 2 — tier
(5.0, "per_kg", 1.00)—width = 5.0 - 1.0 = 4.0;consumed = min(6.5, 4.0) = 4.0; per-kg, sototal += 4.0 * 1.00 = 4.00 → 9.00;remaining = 6.5 - 4.0 = 2.5;prev_upper = 5.0. -
Iteration 3 — tier
(inf, "per_kg", 0.50)—width = inf - 5.0 = inf;consumed = min(2.5, inf) = 2.5; per-kg,total += 2.5 * 0.50 = 1.25 → 10.25;remaining = 0.0. -
Next iteration check —
remaining <= 0, loop breaks. -
Return —
round(10.25, 2) = 10.25.
Output:
| weight_kg | total ($) |
|---|---|
| 7.5 | 10.25 |
Why this works — concept by concept:
-
Sort by threshold — guarantees the greedy walk visits tiers in ascending order; without it,
prev_upperwould jump backward andwidthcould go negative. -
Half-open intervals
(prev_upper, upper]— each tier owns a unique slab; no double-counting at the boundary. -
min(remaining, tier_width)— saturates consumption at whichever runs out first—the input weight or the tier width—so the walk handles inputs that fall mid-tier as well as inputs that span all tiers. -
Flat vs. per-kg shape field — explicit
shapediscriminates "tier rate is added once" from "tier rate is multiplied by the consumed portion", eliminating the most common tiered-pricing bug. -
remaining <= 0guard — short-circuits the loop on light packages so we don't iterate the unbounded final tier when there's nothing left to charge. -
Cost —
O(K log K)for the sort +O(K)for the walk, whereKis the number of tiers;Kis small in practice (≤10), so this is effectively constant time.
PYTHON
Topic — sorting
Sorting problems
PYTHON
Topic — greedy
Greedy problems
2. Python Hash Tables for Transaction-Fee Aggregation
Per-key rollups with defaultdict in Python for data engineering
Per-merchant transaction-fee rollups are the canonical "single-pass aggregation" interview prompt. The mental model: walk the rows once, key by merchant_id, accumulate fees. The invariant is idempotent on the key: dict[key] += amount is associative and commutative, so row order doesn't matter and partial reruns are safe. Three sub-skills carry the section: choosing the right dict flavor (defaultdict vs. Counter vs. plain dict), running the single-pass aggregation, and parsing the row before keying.
Pro tip: Reach for
collections.defaultdict(float)—notCounter—when the values are floating-point money.Counteris integer-flavored (it hasmost_common,subtract,+/-operators that drop zero/negative entries) and will silently drop merchants whose net fees are zero.
defaultdict(float) as a per-key running total
collections.defaultdict(float) returns 0.0 for any missing key on first access, so d[key] += amount always works without an explicit if key not in d guard. The invariant: the default factory is called only on a missing-key read, not on a missing-key write—d[key] = ... never invokes the factory.
-
defaultdict(float)— running total per key, default 0.0. -
defaultdict(int)— running counter per key, default 0. -
defaultdict(list)— append-per-key bucket, default[]. -
Counter— integer-only counter; has handymost_common(n)but loses negative deltas. -
Worked example: three rows—
(M1, 1.20), (M2, 0.80), (M1, 0.50)—keyed by merchant.
| event | dict state after |
|---|---|
d["M1"] += 1.20 |
{"M1": 1.20} |
d["M2"] += 0.80 |
{"M1": 1.20, "M2": 0.80} |
d["M1"] += 0.50 |
{"M1": 1.70, "M2": 0.80} |
from collections import defaultdict
fees = defaultdict(float)
for merchant, fee in [("M1", 1.20), ("M2", 0.80), ("M1", 0.50)]:
fees[merchant] += fee
# defaultdict(<class 'float'>, {'M1': 1.70, 'M2': 0.80})
Single-pass aggregation with dict[key] += amount
The aggregation is one pass: read each row, normalize the key, add the amount. The invariant: dict[key] += amount is O(1) average per row, so the whole pass is O(N) over N rows. Two passes over the same input never beats one—if the interviewer asks "what's the time complexity?" the answer is O(N).
- Single-pass guarantee — no second loop for "now compute totals"; the totals are the dict.
-
Order independence —
+=is commutative and associative, so(a, b, c)and(c, a, b)produce the same dict. - Negative amounts — refunds work naturally; the running total can go down.
-
Final shape — convert to
dict(d)if the consumer expects a plain dict (some serializers don't likedefaultdict). - Worked example: five rows, three distinct merchants, one refund.
| row | merchant | fee |
|---|---|---|
| 1 | M1 | 1.20 |
| 2 | M2 | 0.80 |
| 3 | M1 | 0.50 |
| 4 | M3 | 2.00 |
| 5 | M1 | -0.40 |
After the single pass: {"M1": 1.30, "M2": 0.80, "M3": 2.00}.
from collections import defaultdict
def aggregate_fees(rows: list[tuple[str, float]]) -> dict[str, float]:
totals = defaultdict(float)
for merchant, fee in rows:
totals[merchant] += fee
return dict(totals)
Parse-then-key: merchant_id, amount = line.split(",")
Real interview inputs arrive as text—CSV lines, log entries, or pasted dictionaries. The invariant: parse the row first, validate the parts, then key the dict. Mixing parse and aggregate in one tangled expression is the #1 way to ship bugs.
-
line.split(",")— naive CSV split; fails ifmerchant_iditself contains a comma. -
csv.reader([line])— handles quoting; the right tool for any non-trivial CSV. -
float(amount_str)— convert before keying; never+=a string. -
merchant_id.strip()— handle whitespace from log scrape; "M1 " and "M1" should aggregate. -
Worked example: the line
"M1, 1.20"—naivesplit(",")gives["M1", " 1.20"];float(" 1.20")accepts the leading space, butintwould not.
| raw line | parse step | parsed |
|---|---|---|
"M1, 1.20" |
parts = line.split(",") |
["M1", " 1.20"] |
merchant = parts[0].strip() |
"M1" |
|
fee = float(parts[1]) |
1.20 |
from collections import defaultdict
def aggregate_csv_lines(lines: list[str]) -> dict[str, float]:
totals = defaultdict(float)
for line in lines:
if not line.strip() or line.startswith("#"):
continue # skip blanks and comments
merchant_str, fee_str = line.split(",", maxsplit=1)
totals[merchant_str.strip()] += float(fee_str)
return dict(totals)
Common beginner mistakes
- Using
Counterfor floating-point fees—loses zero-net merchants and shows surprising behavior on negatives. - Forgetting
defaultdict(float)and writingif k in d: d[k] += v else: d[k] = v—works, but verbose and slower. - Calling
dict.setdefault(k, 0.0); d[k] += v—correct but allocates a fresh default every iteration; slower thandefaultdict. - Splitting CSV lines with
.split(",")when the merchant name itself can contain a comma—use thecsvmodule. - Concatenating amounts as strings (
d[k] += amount_str)—Python doesn't auto-cast; you'll get aTypeErroronce the key already has afloatvalue.
Python interview question on transaction-fee aggregation
You receive a list of CSV strings, each formatted merchant_id,amount. Aggregate the fees per merchant_id and return a dict[str, float] with totals rounded to 2 decimals. Skip blank lines and lines starting with #. Whitespace around either field is allowed and must be stripped before keying.
Example input:
M1, 1.20
M2, 0.80
M1, 0.50
# comment, ignored
M3, 2.00
M1, -0.40
Expected output: {"M1": 1.30, "M2": 0.80, "M3": 2.00}.
Solution Using defaultdict(float) and a single-pass walk
from collections import defaultdict
def aggregate_transaction_fees(lines: list[str]) -> dict[str, float]:
totals: defaultdict[str, float] = defaultdict(float)
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
merchant_str, fee_str = line.split(",", maxsplit=1)
totals[merchant_str.strip()] += float(fee_str)
return {k: round(v, 2) for k, v in totals.items()}
Step-by-step trace (input: the six lines from the question):
| line# | content | parse | dict state after |
|---|---|---|---|
| 1 | M1, 1.20 |
("M1", 1.20) |
{M1: 1.20} |
| 2 | M2, 0.80 |
("M2", 0.80) |
{M1: 1.20, M2: 0.80} |
| 3 | M1, 0.50 |
("M1", 0.50) |
{M1: 1.70, M2: 0.80} |
| 4 | # comment... |
(skipped) | (unchanged) |
| 5 | M3, 2.00 |
("M3", 2.00) |
{M1: 1.70, M2: 0.80, M3: 2.00} |
| 6 | M1, -0.40 |
("M1", -0.40) |
{M1: 1.30, M2: 0.80, M3: 2.00} |
- Iterate lines — strip leading/trailing whitespace.
-
Skip blanks and comments — the
# comment...line is dropped before parsing. -
Parse with
split(",", maxsplit=1)— guards against accidental commas in the amount; merchant id is the first field, the rest is the amount string. -
Strip merchant whitespace and
float()the amount — defensive parsing. -
totals[merchant] += amount—defaultdict(float)provides0.0on first key access; subsequent reads accumulate. -
Round on the way out — final dict comprehension applies
round(v, 2)once per key, not once per row. -
Return plain dict —
dict()(or the comprehension result) plays nicely with JSON serializers.
Output:
| merchant_id | total ($) |
|---|---|
| M1 | 1.30 |
| M2 | 0.80 |
| M3 | 2.00 |
Why this works — concept by concept:
-
defaultdict(float)— supplies0.0on the first read for any key, so the+=write is always valid; eliminates theif k in dguard and thesetdefaultallocation. -
Single-pass
+=—O(1)average per row, totalingO(N)overNrows; commutative + associative, so the order of input lines doesn't change the result. - Defensive parse: strip + split + float — catches whitespace from log scraping and rejects malformed amounts at parse time, before they corrupt the running total.
-
Comment / blank-line skip — preserves the contract that the input may include human-friendly markers (
# header row); aggregating those wouldValueErroronfloat("# comment ..."). -
Round once, at the end — applying
roundper row accumulates rounding error; rounding the final total once yields stable cents. -
Cost —
O(N)time overNlines;O(K)space forKdistinct merchants.
PYTHON
Topic — hash table
Hash table problems
PYTHON
Topic — aggregation
Aggregation problems
3. Python Sets for Idempotent Event Apply
Idempotency with a seen-set token cache in Python for data engineering
Idempotency is the canonical correctness invariant for any payments platform: applying the same event twice should produce the same end state as applying it once. The mental model: assign each event a unique idempotency token; before applying, check a seen-set; if the token is already present, skip; otherwise apply and record the token. The invariant is f(f(x)) == f(x). Three sub-skills carry the section: defining idempotency precisely, using a set (or LRU cache) as the dedup primitive, and bounding memory with eviction so the seen-set doesn't grow forever.
Pro tip: A seen-set is fine when the token universe is small enough to fit in memory. Production systems use a bounded cache (LRU via
OrderedDictorfunctools.lru_cache-style primitives, or a Redis TTL set) so memory doesn't grow unbounded. Always state the bounded variant out loud, even if your minimal-loc solution usesset().
Idempotency: f(f(x)) == f(x)
An operation is idempotent when applying it twice gives the same result as applying it once. The invariant: state changes are conditional on a token check, not unconditional. A naive balance += amount is not idempotent—every duplicate doubles the change. A token-guarded if token not in seen: balance += amount; seen.add(token) is.
- Network-safe — clients can retry an HTTP POST without fear of double-charging.
- Restart-safe — a consumer that crashes mid-batch can replay from offset 0 without corrupting state.
- Replay-safe — a downstream system can re-emit yesterday's events for backfill without duplicating effects.
- Out-of-order-safe — applying the same event at two different consumers in different orders still converges.
-
Worked example: apply three events—
(t1, +50), (t2, +30), (t1, +50)—the secondt1is a duplicate.
| event | token | seen before? | applied? | balance after |
|---|---|---|---|---|
| 1 | t1 | no | yes | 50 |
| 2 | t2 | no | yes | 80 |
| 3 | t1 | yes | no | 80 |
def apply_events(events: list[tuple[str, int]]) -> int:
seen: set[str] = set()
balance = 0
for token, delta in events:
if token in seen:
continue
balance += delta
seen.add(token)
return balance
Seen-set short-circuit: skip if token in seen
The dedup primitive is a set of tokens. The invariant: O(1) average lookup and insert, both backed by hashing. The short-circuit pattern is if token in seen: return early—no balance update, no log write, no downstream effect.
-
token in seen—O(1)average; uses the token's__hash__and__eq__. -
seen.add(token)—O(1)average; idempotent for the set itself (adding the same value twice is a no-op). -
Membership-only
set— when you only need "yes/no", don't use adict; sets are smaller and faster. -
Hashable tokens — use
strUUIDs, integer event IDs, or tuples; never mutable types likelist. -
Worked example: four events—
(a, +10), (b, +5), (a, +10), (c, +3)—starting balance 0.
| event | token | check | action | balance |
|---|---|---|---|---|
| 1 | a |
a in seen → False |
apply +10 | 10 |
| 2 | b |
b in seen → False |
apply +5 | 15 |
| 3 | a |
a in seen → True |
skip | 15 |
| 4 | c |
c in seen → False |
apply +3 | 18 |
def apply_with_seen(events: list[tuple[str, int]]) -> int:
seen, balance = set(), 0
for token, delta in events:
if token in seen:
continue
balance += delta
seen.add(token)
return balance
Bounded-cache eviction with collections.OrderedDict
Production token caches don't grow forever—they evict the oldest entries when the cache size exceeds a configured limit. The invariant: most-recent tokens stay in cache; least-recent are evicted in FIFO or LRU order. collections.OrderedDict gives you move_to_end + popitem(last=False) for clean LRU semantics.
-
OrderedDict— preserves insertion order;popitem(last=False)removes the oldest inO(1). -
move_to_end(key)— promotes a key to most-recent on read; converts FIFO to LRU. -
maxsize— the cap; evict whenlen(cache) > maxsize. -
TTL-based caches — Redis with
EXPIRE; the same idea, time-bounded instead of size-bounded. -
Worked example:
maxsize = 3with token sequencea, b, c, d, a.
| step | inserted | cache (oldest → newest) |
|---|---|---|
| 1 | a | [a] |
| 2 | b | [a, b] |
| 3 | c | [a, b, c] |
| 4 | d |
[b, c, d] (a evicted) |
| 5 | a |
[c, d, a] (b evicted; a treated as new) |
from collections import OrderedDict
class BoundedTokenCache:
def __init__(self, maxsize: int) -> None:
self.maxsize = maxsize
self.cache: OrderedDict[str, None] = OrderedDict()
def __contains__(self, token: str) -> bool:
if token in self.cache:
self.cache.move_to_end(token) # LRU bump on read
return True
return False
def add(self, token: str) -> None:
self.cache[token] = None
if len(self.cache) > self.maxsize:
self.cache.popitem(last=False)
Common beginner mistakes
-
Using
list.append+if token in list—O(N)lookup; degrades toO(N²)over the whole stream. -
Forgetting to
seen.add(token)after applying—the very next replay re-applies and the balance doubles. - Storing the event payload in the seen-set instead of a small token—blows memory; tokens should be short strings or integers.
- Letting the seen-set grow unbounded in production—works in interviews; OOM-kills the consumer in real systems.
-
Mutable tokens (lists, dicts)—
TypeError: unhashable type; always usestr,int, ortuple.
Python interview question on idempotent event apply
Implement apply_events(events, balance=0, maxsize=10_000) that walks a list of (token, delta) events and returns the final integer balance. Each token may appear multiple times; the second and later occurrences must be skipped. Use a bounded LRU token cache (maxsize tokens) so memory stays bounded; an evicted token may legitimately re-apply if it returns later (this is the documented trade-off). Return the final balance.
Solution Using a seen-set with an OrderedDict LRU cache
from collections import OrderedDict
def apply_events(
events: list[tuple[str, int]],
balance: int = 0,
maxsize: int = 10_000,
) -> int:
seen: OrderedDict[str, None] = OrderedDict()
for token, delta in events:
if token in seen:
seen.move_to_end(token) # LRU bump
continue
balance += delta
seen[token] = None
if len(seen) > maxsize:
seen.popitem(last=False) # evict LRU
return balance
Step-by-step trace (input: events = [("a", 10), ("b", 5), ("a", 10), ("c", 3), ("b", 5)], maxsize = 3):
| step | event | check | action | balance | cache (oldest → newest) |
|---|---|---|---|---|---|
| 1 | (a,10) |
a in seen → False |
apply | 10 | [a] |
| 2 | (b,5) |
b in seen → False |
apply | 15 | [a, b] |
| 3 | (a,10) |
a in seen → True |
skip + LRU bump | 15 | [b, a] |
| 4 | (c,3) |
c in seen → False |
apply | 18 | [b, a, c] |
| 5 | (b,5) |
b in seen → True |
skip + LRU bump | 18 | [a, c, b] |
-
Initialize —
seenemptyOrderedDict,balance = 0. -
Iteration 1 (
a, 10) —anot in cache; balance becomes 10; inserta. -
Iteration 2 (
b, 5) —bnot in cache; balance becomes 15; insertb. -
Iteration 3 (
a, 10) —ain cache → skip;move_to_end("a")promotes it to most-recent. -
Iteration 4 (
c, 3) —cnot in cache; balance becomes 18; insertc. Cache size is 3, still withinmaxsize. -
Iteration 5 (
b, 5) —bin cache → skip;move_to_end("b")promotes it. -
Eviction never triggers in this trace because the cache stayed at
maxsize = 3; if a fourth distinct token arrived, the LRU entry would be evicted.
Output:
| final_balance |
|---|
| 18 |
Why this works — concept by concept:
-
Idempotency invariant
f(f(x)) == f(x)— guarded by a token check, not by the operation itself; replays of the same token leave the balance unchanged. -
OrderedDictas a set + LRU —__contains__isO(1)average;move_to_endandpopitem(last=False)give clean LRU semantics inO(1). -
LRU bump on read —
move_to_endafter a positiveincheck converts FIFO to LRU; tokens that keep arriving stay hot and aren't evicted. -
Bounded memory —
if len(seen) > maxsize: popitem(last=False)caps memory atmaxsizeentries regardless of stream length; the evicted-and-re-arrives trade-off is documented. - Order independence on the kept tokens — once a token is in the cache, every subsequent replay is a no-op, so input order does not affect the final balance among the in-cache tokens.
-
Cost —
O(N)time forNevents;O(maxsize)space for the bounded cache.
PYTHON
Topic — set
Set problems
PYTHON
Topic — streaming
Streaming problems
Drill more streaming idempotency problems →
4. Python Deques for Bounded Producer-Consumer Queues
Bounded queues with collections.deque(maxlen=N) in Python for data engineering
Bounded producer-consumer queues are the canonical "ingest with back-pressure" interview prompt. The mental model: producers append to one end of a fixed-capacity queue; consumers pop from the other end; when full, the producer must block, drop, or overwrite—never silently grow. The invariant is len(queue) <= maxlen always. Three sub-skills carry the section: choosing collections.deque(maxlen=N) over list, declaring the back-pressure policy explicitly, and respecting the single-producer / single-consumer simplification when one is offered.
Pro tip: Stripe and other payments platforms grade you not just on "does it work" but on what happens when it's full. Always state the back-pressure policy out loud—
block,drop-newest,drop-oldest, oroverflow-into-disk—before typing the first line.
collections.deque(maxlen=N) for O(1) ends
collections.deque is a doubly-linked deque with O(1) append, appendleft, pop, and popleft. The maxlen parameter caps capacity. The invariant: once the deque is at capacity, every append discards the oldest item automatically. That makes deque(maxlen=N) the one-line drop-oldest queue.
-
d.append(x)— push right,O(1); if full, oldest is silently popped from the left. -
d.popleft()— pop left (FIFO consume),O(1). -
d.appendleft(x)— push left,O(1)(rare in producer-consumer; useful for un-pop). -
d.maxlen— read-only int (or None) once constructed; you cannot resize adequein place. -
Worked example:
deque(maxlen=3)with the sequence1, 2, 3, 4.
| step | op | state |
|---|---|---|
| 1 | append(1) |
[1] |
| 2 | append(2) |
[1, 2] |
| 3 | append(3) |
[1, 2, 3] |
| 4 | append(4) |
[2, 3, 4] (1 evicted) |
from collections import deque
q = deque(maxlen=3)
for x in (1, 2, 3, 4):
q.append(x)
print(list(q)) # [2, 3, 4]
Back-pressure: drop, block, or overwrite-oldest
The interesting design decision is what happens at capacity. deque(maxlen=N)'s default is drop-oldest—convenient but not always correct. The invariant: back-pressure policy is a contract with the producer, and the four common variants behave very differently under load.
-
drop-oldest—deque(maxlen=N).append(x)semantics; convenient but loses the oldest event, which is often the most important for an audit trail. -
drop-newest— explicitif len(q) >= N: return; preserves the oldest events. -
block—queue.Queue(maxsize=N).put(x)blocks the producer until a slot frees; back-pressure propagates upstream. -
overflow-to-disk— spill to an append-only log when full; preserves everything at the cost of latency. -
Worked example: capacity 2, sequence
a, b, c—four policies, four outcomes.
| policy | after append('c')
|
dropped | producer behavior |
|---|---|---|---|
| drop-oldest | [b, c] |
a |
non-blocking |
| drop-newest | [a, b] |
c |
non-blocking |
| block |
[a, b] (until consumer popleft) |
none | producer blocks |
| spill-to-disk |
[a, b] + c to log |
none (delayed) | non-blocking |
from collections import deque
class DropNewestQueue:
def __init__(self, maxlen: int) -> None:
self.q: deque = deque()
self.maxlen = maxlen
self.dropped = 0
def put(self, item) -> bool:
if len(self.q) >= self.maxlen:
self.dropped += 1
return False
self.q.append(item)
return True
def get(self):
return self.q.popleft() if self.q else None
Single-producer/single-consumer invariant
Many interview prompts add the simplification "single producer, single consumer" (SPSC). The invariant: without concurrency, no lock or threading.Event is required. SPSC + deque is one of the few cases where the GIL is enough; multi-producer or multi-consumer needs queue.Queue, asyncio.Queue, or an explicit lock.
-
SPSC, single thread —
dequeis fine; no synchronization needed. -
MPMC, multi-thread —
queue.Queue(thread-safe, blockingput/get). -
Async coroutines —
asyncio.Queuewithawait q.put(...)/await q.get(). -
Cross-process —
multiprocessing.Queuewith serialization overhead. -
Worked example: SPSC append/popleft sequence with the
drop-newestqueue from above.
| step | producer op | consumer op | state | notes |
|---|---|---|---|---|
| 1 | put('a') |
— | [a] |
OK |
| 2 | put('b') |
— | [a, b] |
full |
| 3 | put('c') |
— | [a, b] |
dropped |
| 4 | — | get() → 'a' |
[b] |
drain |
| 5 | put('c') |
— | [b, c] |
OK now |
from collections import deque
# SPSC: a single-threaded producer-consumer loop using a bounded deque.
q = deque(maxlen=2)
producer_inputs = ['a', 'b', 'c']
for x in producer_inputs:
q.append(x) # drop-oldest semantics
# After the loop: q is ['b', 'c']; 'a' was evicted on the third append.
Common beginner mistakes
- Using
listwithpop(0)—O(N)per pop, turns the queue into a quadratic bottleneck. - Forgetting to declare the back-pressure policy—silently relying on
deque(maxlen)'s drop-oldest, which is rarely what you want for audit-critical events. - Resizing a
dequeafter construction—deque.maxlenis read-only; you have to allocate a new one and migrate. - Using
deque(maxlen=N)from multiple threads without a lock—appendandpopleftare individually thread-safe, but compound operations (if not full: put) are not. - Storing payloads inline that would blow the memory budget—queue items should be small (IDs, references), not full event payloads, when capacity is large.
Python interview question on a bounded producer-consumer queue
Implement BoundedQueue(maxlen, overflow="drop_newest") with three methods:
-
put(item) -> bool— push to the right; returnTrueif accepted,Falseif dropped. -
get() -> item | None— pop from the left; returnNoneif empty. -
__len__() -> int— current size.
overflow must support "drop_oldest", "drop_newest", and "block_no_op" (return False without dropping the existing data). Track and expose a .dropped counter for visibility. Single-producer / single-consumer; no thread synchronization required.
Solution Using collections.deque with an explicit overflow policy
from collections import deque
from typing import Any, Optional
class BoundedQueue:
def __init__(self, maxlen: int, overflow: str = "drop_newest") -> None:
if overflow not in {"drop_oldest", "drop_newest", "block_no_op"}:
raise ValueError(f"unknown overflow policy: {overflow}")
self._q: deque = deque()
self.maxlen = maxlen
self.overflow = overflow
self.dropped = 0
def put(self, item: Any) -> bool:
if len(self._q) < self.maxlen:
self._q.append(item)
return True
# at capacity
if self.overflow == "drop_oldest":
self._q.popleft()
self._q.append(item)
self.dropped += 1
return True
if self.overflow == "drop_newest":
self.dropped += 1
return False
# block_no_op
self.dropped += 1
return False
def get(self) -> Optional[Any]:
return self._q.popleft() if self._q else None
def __len__(self) -> int:
return len(self._q)
Step-by-step trace (input: BoundedQueue(maxlen=2, overflow="drop_newest") with calls put('a'), put('b'), put('c'), get(), put('c')):
| step | call | state before | branch taken | state after | dropped | return |
|---|---|---|---|---|---|---|
| 1 | put('a') |
[] |
len < maxlen → append |
['a'] |
0 | True |
| 2 | put('b') |
['a'] |
len < maxlen → append |
['a','b'] |
0 | True |
| 3 | put('c') |
['a','b'] |
full + drop_newest
|
['a','b'] |
1 | False |
| 4 | get() |
['a','b'] |
non-empty → popleft | ['b'] |
1 | 'a' |
| 5 | put('c') |
['b'] |
len < maxlen → append |
['b','c'] |
1 | True |
-
Construction —
dequeempty,maxlen=2,overflow="drop_newest",dropped=0. -
Step 1–2 — both
putcalls succeed; queue grows to['a', 'b']. -
Step 3 — at capacity, policy is
drop_newest, so'c'is rejected, counter increments, return isFalse. -
Step 4 — consumer drains one slot;
'a'returned, queue is['b']. -
Step 5 — capacity available again;
'c'accepted, queue is['b', 'c']. -
Audit —
q.dropped == 1andlen(q) == 2; the consumer can decide whether to alert based on the dropped counter.
Output:
| call | return |
|---|---|
| put('a') | True |
| put('b') | True |
| put('c') | False |
| get() | 'a' |
| put('c') | True |
q.dropped |
1 |
Why this works — concept by concept:
-
collections.dequeforO(1)ends — bothappendandpopleftare constant-time;list.pop(0)would beO(N)and ruin throughput. -
Explicit overflow policy —
drop_oldest/drop_newest/block_no_opare different contracts with the producer; the constructor validates the value to fail fast on typos. -
Drop counters as observability —
.droppedlets the consumer or a metrics scraper alert on back-pressure without changing the queue's hot path. -
SPSC simplification — single producer / single consumer means no locks;
deque'sappendandpopleftare atomic w.r.t. each other under the GIL for the SPSC pattern. -
Boolean return on
put— the producer learns immediately whether the item was accepted, so it can degrade gracefully (sample, log, or cancel upstream). -
Cost —
O(1)perputand perget;O(maxlen)space.
PYTHON
Topic — queue
Queue problems
PYTHON
Topic — streaming
Streaming problems
5. Python Streaming with Event-Time Watermarks
Event-time watermarks and late-record drop in Python for data engineering
Streaming correctness lives or dies on the distinction between event time (when something happened) and processing time (when your system saw it). The mental model: events arrive out of order; you maintain a watermark = max(event_time) − allowed_lateness and drop anything with event_time < watermark. The invariant: once the watermark advances past a timestamp, that timestamp is "closed" and contributing events are policy-dropped (or routed to a side output). Three sub-skills carry the section: defining the two clocks, computing the watermark, and choosing what to do with late records.
Pro tip: State the allowed-lateness budget out loud. "I'm choosing 60 seconds of allowed lateness because the upstream Kafka cluster is configured to retry for up to 30 seconds, and I want a 2× safety margin." Interviewers grade the justification more than the value.
Event time vs. processing time
Event time is the timestamp embedded in the event payload (when the user's tap happened). Processing time is time.time() inside your consumer (when your code is about to handle it). The invariant: event time is monotonic per producer but not globally; processing time is monotonic per process but unrelated to the real world.
- Event time — drives correctness; "what happened at 09:55?".
- Processing time — drives latency; "how long until I saw the 09:55 event?".
-
Out-of-order — events with
event_time = 09:55can arrive atprocessing_time = 10:02because of network lag. - Skew — clock skew across producer machines; pin to a single trusted timestamp source if possible.
- Worked example: six events arriving at six processing-time ticks, with their event-time timestamps shown below.
| processing_time | event_id | event_time |
|---|---|---|
| 10:00:00 | e1 | 10:00:00 |
| 10:01:00 | e2 | 10:01:00 |
| 10:02:00 | e3 | 10:02:00 |
| 10:02:30 | e4 | 09:58:00 |
| 10:03:00 | e5 | 10:03:00 |
| 10:03:30 | e6 | 09:55:00 |
e4 and e6 are late—their event time is older than recently seen event times.
Watermark = max(event_time) - allowed_lateness
A watermark is a monotonic timestamp that asserts "I won't accept events with event_time < watermark". The simplest form: watermark = max_observed_event_time − allowed_lateness, advanced after every event. The invariant: the watermark only ever moves forward; never backward.
-
max_seen_event_time— running max over all observed events. -
allowed_lateness— slack budget; tunable per pipeline. -
Monotonic —
watermark = max(watermark, max_seen_event_time − allowed_lateness); never reset on the way down. -
Per-key watermarks — for fairness, sometimes maintained per partition (per
merchant_id). -
Worked example: apply
allowed_lateness = 2 minutesto the 6-event stream above.
| step | event_time | max_seen | watermark before | watermark after | drop? |
|---|---|---|---|---|---|
| 1 | 10:00:00 | 10:00:00 | -inf | 09:58:00 | no |
| 2 | 10:01:00 | 10:01:00 | 09:58:00 | 09:59:00 | no |
| 3 | 10:02:00 | 10:02:00 | 09:59:00 | 10:00:00 | no |
| 4 | 09:58:00 | 10:02:00 | 10:00:00 | 10:00:00 |
yes (09:58:00 < 10:00:00) |
| 5 | 10:03:00 | 10:03:00 | 10:00:00 | 10:01:00 | no |
| 6 | 09:55:00 | 10:03:00 | 10:01:00 | 10:01:00 |
yes (09:55:00 < 10:01:00) |
class Watermark:
def __init__(self, allowed_lateness_s: float) -> None:
self.allowed = allowed_lateness_s
self.max_seen = float("-inf")
self.value = float("-inf")
def observe(self, event_time_s: float) -> None:
if event_time_s > self.max_seen:
self.max_seen = event_time_s
# monotonic: never rewind
self.value = max(self.value, self.max_seen - self.allowed)
Drop, side-output, or update — late-data policy
Three policies are common: drop (cheapest, lossy), side-output (route to a separate sink for offline reconciliation), and update (re-open the closed window and patch downstream aggregates). The invariant: the policy is a business decision, not a coding decision. State it explicitly.
-
Drop — ignore late records; metrics show
dropped_countfor alerting. - Side-output — route late records to a Kafka DLQ or S3 path; reconcile in a batch job.
- Update — most expensive; requires idempotent downstream state and an "update event" channel.
- Tombstone — for "event was wrong, retract it" semantics; common in CDC pipelines.
-
Worked example: apply the
droppolicy to the 6-event stream; final accepted set is{e1, e2, e3, e5}, late count is 2.
def filter_by_watermark(
events: list[tuple[str, float]],
allowed_lateness_s: float,
) -> tuple[list[str], int]:
wm = Watermark(allowed_lateness_s)
accepted, dropped = [], 0
for event_id, event_time in events:
wm.observe(event_time)
if event_time < wm.value:
dropped += 1
continue
accepted.append(event_id)
return accepted, dropped
Common beginner mistakes
- Treating
processing_timeasevent_time—loses correctness as soon as one event is late. - Letting the watermark go backward when a late event arrives—
watermark = max_seen − allowedwithout themax(watermark, ...)guard. - Dropping the late event and updating
max_seenfrom it—pollutes the watermark with stale values. - Forgetting per-key watermarks when one merchant is much slower than the rest—globally lagging watermark blocks fast merchants.
- Setting
allowed_lateness = 0—rejects every out-of-order event, including ones that are only milliseconds late.
Python interview question on event-time watermark drop
Implement drop_late_records(events, allowed_lateness_s) that takes a list of (event_id, event_time_s) tuples (event_time as Unix seconds), maintains a monotonic watermark with the policy watermark = max(watermark, max_seen_event_time − allowed_lateness_s), and returns (accepted_ids, dropped_count). An event is dropped if event_time_s < watermark after the watermark is updated for the current event.
Solution Using a monotonic watermark with explicit drop policy
from typing import List, Tuple
def drop_late_records(
events: List[Tuple[str, float]],
allowed_lateness_s: float,
) -> Tuple[List[str], int]:
accepted: list[str] = []
dropped = 0
max_seen = float("-inf")
watermark = float("-inf")
for event_id, event_time in events:
if event_time > max_seen:
max_seen = event_time
# monotonic: never rewind
watermark = max(watermark, max_seen - allowed_lateness_s)
if event_time < watermark:
dropped += 1
continue
accepted.append(event_id)
return accepted, dropped
Step-by-step trace (input: the six events from the worked example, allowed_lateness_s = 120):
| step | event_id | event_time | max_seen | watermark | event < wm? | result |
|---|---|---|---|---|---|---|
| 1 | e1 | 10:00:00 | 10:00:00 | 09:58:00 | no | accept |
| 2 | e2 | 10:01:00 | 10:01:00 | 09:59:00 | no | accept |
| 3 | e3 | 10:02:00 | 10:02:00 | 10:00:00 | no | accept |
| 4 | e4 | 09:58:00 | 10:02:00 | 10:00:00 | yes | drop |
| 5 | e5 | 10:03:00 | 10:03:00 | 10:01:00 | no | accept |
| 6 | e6 | 09:55:00 | 10:03:00 | 10:01:00 | yes | drop |
-
Initialize —
max_seen = -inf,watermark = -inf,accepted = [],dropped = 0. -
Steps 1–3 — every event sets a new
max_seenand advances the watermark by exactly 60 seconds (because the events themselves arrive 60 seconds apart in event-time). -
Step 4 (
e4) — event time 09:58:00 does not updatemax_seen(still 10:02:00); watermark stays at 10:00:00 from the previous tick; the event check09:58:00 < 10:00:00is true → drop. -
Step 5 (
e5) — event time 10:03:00 advancesmax_seen; watermark advances to 10:01:00. -
Step 6 (
e6) — event time 09:55:00 doesn't movemax_seen; watermark stays at 10:01:00; check09:55:00 < 10:01:00is true → drop. -
Return —
(["e1", "e2", "e3", "e5"], 2).
Output:
| accepted_ids | dropped |
|---|---|
["e1", "e2", "e3", "e5"] |
2 |
Why this works — concept by concept:
-
Event time vs. processing time — the filter is decided by the embedded
event_time, not by clock-on-the-wall arrival; this is the reason watermarks exist. -
Monotonic watermark —
watermark = max(watermark, max_seen - allowed_lateness_s)guarantees the watermark never rewinds; downstream stages can safely close windows. - Allowed-lateness slack — the 120-second budget absorbs minor out-of-order arrivals; events more than 2 minutes late are dropped, alerted on, or side-outputted.
-
Late record does not move
max_seen— lineif event_time > max_seen: max_seen = event_timeignores stale events, so the watermark isn't dragged backward. -
Single explicit drop policy —
dropped += 1; continueis readable, observable, and easy to swap out for a side-output if the business requirement changes. -
Cost —
O(N)time overNevents;O(K)space for theKaccepted events.
PYTHON
Topic — streaming
Streaming problems
PYTHON
Topic — filtering
Filtering problems
Practice streaming-filter problems →
6. Python End-to-End ETL with Validation and Logging
CSV ingest, per-row validate, and a logging summary in Python for data engineering
End-to-end ETL is the canonical Hard-tier prompt: read a CSV, validate per row, load valid rows somewhere, log a summary at the end. The mental model is three lanes—ingest, validate, load—plus counters that survive the full run. The invariant: no malformed row crashes the pipeline; bad rows are counted, logged, and quarantined. Three sub-skills carry the section: ingest with csv.DictReader, per-row validation with explicit error counters, and an end-of-run summary that's both human-readable and machine-parseable.
Pro tip: The Hard-tier ETL prompts grade you on observability, not just correctness. Always return a structured summary—
{rows_seen, rows_loaded, rows_dropped, errors_by_type}. Interviewers want to see that you build pipelines you can debug at 3 AM.
CSV ingest with csv.DictReader
csv.DictReader returns each row as a dict[str, str] keyed by the header. The invariant: DictReader handles quoting, embedded commas, and CRLF for you—line.split(",") does not. Always reach for csv over manual splitting on any CSV that comes from the wild.
-
csv.DictReader(file)— first row becomes the header; each row is a dict. -
row[col]— string by default; cast explicitly toint/float/datetime. -
Encoding — open with
encoding="utf-8-sig"to swallow the BOM that Excel exports include. -
newline=""— open files withnewline=""socsvcontrols line termination cross-platform. - Worked example: a 3-row CSV with a header.
| line | content |
|---|---|
| 1 | merchant_id,fee |
| 2 | M1,1.20 |
| 3 | M2,0.80 |
| 4 | M3,2.00 |
import csv
with open("fees.csv", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
print(row) # e.g., {"merchant_id": "M1", "fee": "1.20"}
Per-row validation and error counters
Validation runs per row. The invariant: a row either passes all checks (loaded) or fails at least one (counted by category). Counters by error category make the post-mortem fast: "we dropped 137 rows, 130 of which had bad_amount."
-
Counterfor error categories —errors["bad_amount"] += 1. -
Pure-function validator — returns
(ok, error_kind)tuples; no side effects. -
Skip-and-continue — never
raiseon a bad row inside the loop; aggregate, then decide. -
Defensive cast —
try: float(row["fee"]) except ValueError: .... - Worked example: four rows, one missing field, one bad amount.
| row | content | result |
|---|---|---|
| 1 | {merchant_id: M1, fee: 1.20} |
ok |
| 2 | {merchant_id: M2, fee: } |
error: bad_amount
|
| 3 | {merchant_id: , fee: 2.00} |
error: missing_merchant
|
| 4 | {merchant_id: M3, fee: 2.00} |
ok |
from collections import Counter
def validate(row: dict) -> tuple[bool, str | None]:
if not (row.get("merchant_id") or "").strip():
return False, "missing_merchant"
fee_str = (row.get("fee") or "").strip()
if not fee_str:
return False, "bad_amount"
try:
fee = float(fee_str)
except ValueError:
return False, "bad_amount"
if fee < 0:
return False, "negative_amount"
return True, None
Aggregated logging summary at end-of-run
The end-of-run summary is the deliverable. The invariant: the summary is structured (a dict) and exhaustive (every counter accounted for). Don't print ad-hoc lines; return the dict and let the caller decide whether to log, push to Datadog, or POST to a webhook.
-
{rows_seen, rows_loaded, rows_dropped, errors_by_type}— minimum viable summary. -
Counter.most_common(3)— surface top error categories for human eyeballs. -
elapsed_s— wall-clock duration; helps spot slow runs. -
JSON-serializable — dicts of primitives only; downstream alerting expects to
json.dumpsit. - Worked example: a 4-row run with 2 ok and 2 dropped.
import json
summary = {
"rows_seen": 4,
"rows_loaded": 2,
"rows_dropped": 2,
"errors_by_type": {"missing_merchant": 1, "bad_amount": 1},
"elapsed_s": 0.012,
}
print(json.dumps(summary, indent=2))
Common beginner mistakes
- Letting one bad row crash the whole pipeline—batch ETL must aggregate and continue.
- Writing the summary as ad-hoc
printlines instead of a dict—not greppable, not machine-parseable. - Validating after loading—now your destination has bad data and you're playing cleanup instead of triage.
- Forgetting
encoding="utf-8-sig"—Excel exports start with a BOM that mangles the first column header. - Using
line.split(",")instead ofcsv.DictReader—breaks the moment a merchant name contains a quoted comma.
Python interview question on end-to-end ETL with logging summary
Implement run_etl(rows) that:
- Iterates a list of
dictrows (you can think of these as already coming out ofcsv.DictReader). - Validates each row:
merchant_idnon-empty (after strip),feeparseable as float,fee >= 0. - Loads the valid rows by appending them (with
feeas afloat) to an output list. - Returns
(loaded, summary)wheresummaryis a dict with keysrows_seen,rows_loaded,rows_dropped, anderrors_by_type(a dict of error category → count).
Solution Using a per-row validator and a Counter for error categories
from collections import Counter
from typing import Any
def _validate(row: dict[str, Any]) -> tuple[bool, str | None, float | None]:
merchant = (row.get("merchant_id") or "").strip()
if not merchant:
return False, "missing_merchant", None
fee_str = str(row.get("fee", "")).strip()
if not fee_str:
return False, "bad_amount", None
try:
fee = float(fee_str)
except ValueError:
return False, "bad_amount", None
if fee < 0:
return False, "negative_amount", None
return True, None, fee
def run_etl(rows: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
loaded: list[dict[str, Any]] = []
errors: Counter[str] = Counter()
seen = 0
for row in rows:
seen += 1
ok, kind, fee = _validate(row)
if not ok:
errors[kind] += 1
continue
loaded.append({"merchant_id": row["merchant_id"].strip(), "fee": fee})
summary = {
"rows_seen": seen,
"rows_loaded": len(loaded),
"rows_dropped": sum(errors.values()),
"errors_by_type": dict(errors),
}
return loaded, summary
Step-by-step trace (input: five rows below):
| row | content | validator returns |
|---|---|---|
| 1 | {merchant_id: "M1", fee: "1.20"} |
(True, None, 1.20) |
| 2 | {merchant_id: " ", fee: "0.80"} |
(False, "missing_merchant", None) |
| 3 | {merchant_id: "M2", fee: ""} |
(False, "bad_amount", None) |
| 4 | {merchant_id: "M3", fee: "abc"} |
(False, "bad_amount", None) |
| 5 | {merchant_id: "M4", fee: "-0.10"} |
(False, "negative_amount", None) |
-
Initialize —
loaded = [],errors = Counter(),seen = 0. -
Row 1 — passes all checks; append
{"merchant_id": "M1", "fee": 1.20};seen = 1. -
Row 2 — empty merchant after strip;
errors["missing_merchant"] += 1;seen = 2. -
Row 3 — empty fee string;
errors["bad_amount"] += 1;seen = 3. -
Row 4 —
float("abc")raisesValueError; counted asbad_amount;seen = 4. -
Row 5 — fee is negative;
errors["negative_amount"] += 1;seen = 5. -
Build summary —
rows_seen = 5,rows_loaded = 1,rows_dropped = 4,errors_by_type = {"missing_merchant": 1, "bad_amount": 2, "negative_amount": 1}. - Return — the loaded list (one entry) and the structured summary dict.
Output:
| key | value |
|---|---|
rows_seen |
5 |
rows_loaded |
1 |
rows_dropped |
4 |
errors_by_type.missing_merchant |
1 |
errors_by_type.bad_amount |
2 |
errors_by_type.negative_amount |
1 |
Why this works — concept by concept:
-
Pure validator —
_validate(row)returns(ok, kind, fee)with no side effects; trivially unit-testable in isolation. -
Counter for error categories —
Counterkeepserrors_by_typeexhaustive and letsmost_common(N)surface the top failure modes for alerting. -
Skip-and-continue — invalid rows increment a counter and
continue; the pipeline never throws on bad data, which is the whole point of batch ETL. -
Structured summary — the returned dict is JSON-serializable; the caller can
json.dumpsit for logs, push to Datadog, or POST to a webhook without further processing. -
Separation of concerns —
_validatedecides correctness;run_etldecides bookkeeping; the load step is a singleloaded.append(...). Each lane changes independently. -
Cost —
O(N)time overNrows;O(K)space forKdistinct error categories plus the loaded output list.
PYTHON
Topic — ETL
ETL problems
PYTHON
Topic — CSV processing
CSV processing problems
7. SQL JSONB Filtering with Type Casting
Top-level key extraction from JSONB columns in SQL for data engineering
JSONB is the workhorse for flexible event payloads in payments platforms—webhook bodies, dispute metadata, fraud-signal blobs. The mental model: a JSONB column is a binary, indexed, deduped dictionary; you query it with ->, ->>, and helper functions like jsonb_object_keys. The invariant: -> returns JSONB; ->> returns text; both require explicit casts before numeric or temporal predicates. Three sub-skills carry the section: choosing JSONB over JSON, extracting top-level keys, and casting safely with ::int, ::numeric, and ::timestamptz.
Pro tip: For "extract the top-level keys of a JSONB column", reach for
jsonb_object_keys(col)(set-returning) orjsonb_each(col)(key + value tuples). Don't try to roll your own with regex on the text representation—you will lose to the database planner and to JSONB's binary deduplication.
JSONB vs. JSON: binary, indexable, deduped keys
json is a text type; jsonb is a binary, normalized type. The invariant: JSONB drops duplicate keys, normalizes whitespace, and supports GIN indexes; JSON preserves the source bytes verbatim. For analytics, JSONB is almost always the right choice.
-
jsonb— binary, deduped keys, GIN-indexable, slightly slower to insert. -
json— text, preserves bytes, no deduplication, faster insert / no GIN. -
->— extract value as JSONB; chainable (payload -> 'a' -> 'b'). -
->>— extract value as text; use this when you need a SQL string or a target for::cast. -
Worked example: the same payload stored as
jsonbdeduplicates the repeatedamountkey.
| input JSON | stored as jsonb
|
|---|---|
{"amount": 10, "amount": 20, "merchant": "M1"} |
{"amount": 20, "merchant": "M1"} |
-- JSON preserves duplicates as raw text:
SELECT '{"amount": 10, "amount": 20}'::json;
-- returns: {"amount": 10, "amount": 20}
-- JSONB deduplicates and keeps the last value:
SELECT '{"amount": 10, "amount": 20}'::jsonb;
-- returns: {"amount": 20}
jsonb_object_keys(event_payload) for top-level keys
jsonb_object_keys(jsonb) is a set-returning function: it returns one row per top-level key. The invariant: only top-level keys—nested objects' keys are not flattened. Pair it with DISTINCT to dedupe across rows.
-
jsonb_object_keys(col)— one text row per top-level key. -
Set-returning, so wrap it in a subquery or
LATERAL-join into the parent table. -
Top-level only — for a fully recursive walk, use
jsonb_path_query(col, '$.**'). -
Pair with
DISTINCT— when the goal is "the universe of keys ever observed". - Worked example: three event payloads, three different shapes.
| event_id | event_payload (JSONB) |
|---|---|
| 1 | {"amount": 10, "merchant": "M1"} |
| 2 | {"amount": 20, "merchant": "M2", "currency": "USD"} |
| 3 | {"merchant": "M3", "currency": "USD"} |
jsonb_object_keys over each row produces:
| event_id | key |
|---|---|
| 1 | amount |
| 1 | merchant |
| 2 | amount |
| 2 | merchant |
| 2 | currency |
| 3 | merchant |
| 3 | currency |
SELECT DISTINCT key over that subquery yields {amount, merchant, currency}.
SELECT DISTINCT jsonb_object_keys(event_payload) AS key
FROM events;
->> extraction + ::int / ::timestamptz casts
->> returns a text. Almost every interesting predicate needs an explicit cast: (payload ->> 'amount')::numeric > 100, (payload ->> 'created_at')::timestamptz < now() - interval '1 day'. The invariant: cast first, predicate second—Postgres won't auto-cast text in arithmetic comparisons.
-
(col ->> 'k')::int— text → int; raises on bad data. -
(col ->> 'k')::numeric— text → numeric; safer for fees / amounts. -
(col ->> 'k')::timestamptz— text → timestamp with TZ; standard for event times. -
(col -> 'k')— JSONB → JSONB; use this if the value is itself an object. -
COALESCEfor missing keys —COALESCE((col ->> 'k')::numeric, 0). -
Worked example: filter to events where
amount > 15from the table above.
| event_id | event_payload ->> 'amount' |
cast to numeric | passes > 15? |
|---|---|---|---|
| 1 | '10' |
10 | no |
| 2 | '20' |
20 | yes |
| 3 | NULL |
NULL | NULL → no |
SELECT event_id
FROM events
WHERE (event_payload ->> 'amount')::numeric > 15;
-- Returns event_id = 2.
Common beginner mistakes
- Using
->where->>was needed—(col -> 'amount')::numericfails because the input is JSONB, not text. - Skipping
COALESCEwhen the key is optional—missing keys return NULL and break aggregates that don't tolerate NULL. - Using
jsonwhenjsonbis available—loses GIN, loses deduplication, loses query speed. - Casting
(col ->> 'amount')::inton a text that contains1.20—::intrejects decimals; use::numericor::float. - Trying to query nested keys with
jsonb_object_keysalone—jsonb_object_keysis top-level only; chain with-> 'parent'or usejsonb_path_queryfor deep walks.
SQL interview question on extracting top-level keys from JSONB
Given events(event_id INT, event_payload JSONB), return the distinct set of top-level keys ever seen across all event_payload values, sorted alphabetically. Output a single column key of type text. Empty objects ('{}'::jsonb) contribute zero rows; missing keys never appear.
Solution Using jsonb_object_keys with DISTINCT and ORDER BY
SELECT DISTINCT key
FROM events,
LATERAL jsonb_object_keys(event_payload) AS key
ORDER BY key;
Step-by-step trace (input: the three-row table from the worked example, plus a fourth row with '{}'::jsonb):
| event_id | event_payload |
|---|---|
| 1 | {"amount": 10, "merchant": "M1"} |
| 2 | {"amount": 20, "merchant": "M2", "currency": "USD"} |
| 3 | {"merchant": "M3", "currency": "USD"} |
| 4 | {} |
-
FROM events, LATERAL jsonb_object_keys(...)— for each row inevents, the lateral function emits one row per top-level key. -
Row 1 — emits
(1, "amount"),(1, "merchant"). -
Row 2 — emits
(2, "amount"),(2, "merchant"),(2, "currency"). -
Row 3 — emits
(3, "merchant"),(3, "currency"). -
Row 4 (
{}) — emits zero rows (empty object has no keys); the row is dropped from the output. -
SELECT DISTINCT key— collapses duplicates:{amount, merchant, currency}. -
ORDER BY key— alphabetical:amount,currency,merchant.
Output:
| key |
|---|
| amount |
| currency |
| merchant |
Why this works — concept by concept:
-
JSONB binary representation — keys are deduplicated and normalized at insert time; we don't have to worry about
"amount"vs." amount"or duplicate keys. -
jsonb_object_keysis set-returning — emits one row per top-level key;LATERALjoins it into each input row, expanding the universe of(event_id, key)pairs. -
LATERALjoin — required becausejsonb_object_keysreferences a column from the outer table (events.event_payload); a plainCROSS JOINwouldn't have visibility. -
DISTINCT key— collapses to the universe of keys; equivalent toGROUP BY key, butDISTINCTis idiomatic for set-extraction queries. -
Empty objects contribute zero rows —
jsonb_object_keys('{}'::jsonb)is the empty set; the corresponding event silently drops out, which is the desired semantics. -
Cost —
O(N · K)whereNis the row count andKis average keys per row, plus the sort; for a typical events table with 1–2 dozen keys, this is comfortably linear and easily indexable with a GIN index onevent_payload.
SQL
Topic — type casting
Type casting problems
SQL
Topic — filtering
Filtering problems
Tips to crack Stripe data engineering interviews
These are habits that move the needle in real Stripe loops—not a re-statement of the topics above.
Python preparation
Stripe tilts Python-heavy. Drill these patterns until they're muscle memory:
-
defaultdictandCounter— single-pass aggregation by key (transaction-fee rollups). -
setandOrderedDictfor dedup — idempotent event apply; LRU bump on read;popitem(last=False)to evict. -
collections.deque(maxlen=N)— bounded queues with explicit overflow policy (drop-newest, drop-oldest, block). -
Sort + greedy walk — tiered pricing schedules with
min(remaining, tier_width)saturation. -
Watermark filters —
max(watermark, max_seen − allowed_lateness); never let it rewind. -
ETL with structured summaries —
{rows_seen, rows_loaded, rows_dropped, errors_by_type}; return the dict, don't print it.
Topic-page drills: hash table, set, queue, streaming, ETL.
SQL preparation
Stripe's single SQL question on the curated set is JSONB introspection with type-casting—very payments-platform flavored:
-
jsonb_object_keysandjsonb_each— top-level key extraction; pair withLATERALandDISTINCT. -
->vs->>and explicit::cast— cast first, predicate second;::numericover::intfor money. -
COALESCEfor missing keys —COALESCE((p ->> 'k')::numeric, 0)keeps aggregates honest. -
GIN indexes on JSONB —
CREATE INDEX ON events USING gin(event_payload)for@>containment queries. -
jsonb_path_queryfor nested walks — when keys are deep; otherwise->chains.
Entry points: type casting, filtering, Stripe Python practice.
Hard-tier ETL fluency
Stripe's only Hard-tier curated problem is the end-to-end ETL with logging summary. Get fluent with the family:
-
csv.DictReader— handles quoting and embedded commas; always withencoding="utf-8-sig"andnewline="". -
Pure validators — return
(ok, kind, value); no side effects; trivial to unit-test. -
Counterfor error categories —errors[kind] += 1; surfacemost_common(3)in the summary. -
Structured summary dict —
{rows_seen, rows_loaded, rows_dropped, errors_by_type, elapsed_s}; JSON-serializable for downstream alerting. -
Skip-and-continue — never
raiseon a bad row inside the loop; aggregate, then decide.
End-to-end ETL is the highest-ROI Hard-tier prep target for Stripe-style loops.
Payments-platform domain framing
Stripe's prompts are payments-platform flavored: shipping fees (tiered pricing), transaction fees (per-merchant rollups), idempotency tokens (replay safety), bounded buffers (back-pressure), event-time vs processing-time (out-of-order arrivals), ETL run logs (observability), JSONB event payloads (flexible schemas). State the mapping out loud: "this is a tiered fee schedule via sort + greedy"; "this is a per-merchant rollup via defaultdict"; "this is replay-safety via a seen-set + token cache"; "this is a deque(maxlen=N) with drop-newest"; "this is a watermark filter"; "this is csv.DictReader + per-row validation + a summary dict"; "this is jsonb_object_keys with a LATERAL join."
Where to practice on PipeCode
| Skill lane | Practice path |
|---|---|
| Curated Stripe practice set | /explore/practice/company/stripe |
| Stripe Python practice | /explore/practice/company/stripe/python |
| Stripe medium-difficulty practice | /explore/practice/company/stripe/difficulty/medium |
| Sorting | /explore/practice/topic/sorting |
| Greedy | /explore/practice/topic/greedy |
| Hash table | /explore/practice/topic/hash-table |
| Set | /explore/practice/topic/set |
| Queue | /explore/practice/topic/queue |
| Streaming | /explore/practice/topic/streaming |
| ETL | /explore/practice/topic/etl |
| Type casting | /explore/practice/topic/type-casting |
| All practice topics | /explore/practice/topics |
| Interview courses | /explore/courses |
Communication under time pressure
Three things to say out loud before, during, and after each problem — in this order:
-
Assumptions — before typing. "I'll assume
eventsare at-least-once delivered, so I need an idempotency token"; "I'll assume the tier table fits in memory and is small (≤10 entries)"; "I'll assumeevent_payloadisjsonb, notjson." -
Invariants — after key code blocks. "
watermarkis monotonic—I never let it rewind"; "deque(maxlen=N)drops the oldest by default"; "jsonb_object_keysis top-level only." -
Complexity — at the end. "Single-pass
O(N)over the events"; "tier walk isO(K)whereK ≤ 10"; "the JSONB query isO(N · K)with a GIN index keeping containment fast."
Interviewers grade clear reasoning above silent-and-perfect.
Frequently Asked Questions
What is the Stripe data engineering interview process like?
The Stripe data engineering interview typically includes a phone screen (mostly Python warm-up around aggregation, dedup, or basic streaming), one or two coding rounds focused on Python data-engineering primitives (idempotency, bounded queues, watermarks, ETL), at least one round with SQL JSONB introspection, plus a system-design conversation around payments-data pipelines and behavioral interviews. The curated 7-problem Stripe practice set on PipeCode mirrors what you will see on the technical rounds.
What Python topics does Stripe test for data engineers?
Stripe emphasizes Python data-engineering primitives, not algorithm puzzles: hash-table aggregation (defaultdict, Counter), set-based deduplication for idempotency, collections.deque(maxlen=N) for bounded producer-consumer queues, event-time watermarks for streaming, and end-to-end CSV ETL with validation and a logging summary. Drill these on the hash table, set, queue, streaming, and ETL topic pages.
How important is SQL for a Stripe data engineering interview?
SQL is roughly 15–20% of the technical interview at Stripe—much lighter than at SQL-heavy shops, but the SQL you do see is payments-platform-specific: JSONB introspection (jsonb_object_keys, ->>, ::cast) over flexible event payloads. The type casting and filtering topic pages plus the curated Stripe set are the right drilling targets.
How hard are Stripe data engineering interview questions?
Stripe's curated set has 1 easy + 5 medium + 1 hard = a medium-heavy hub. The Hard-tier problem is the end-to-end ETL with logging summary—an exercise in observability, not algorithmic cleverness. Most candidates underestimate idempotency and watermarks; those are where Stripe's loops separate "knows Python" from "ships payments-grade Python."
Are streaming and watermark questions common in Stripe interviews?
Yes—out-of-order arrivals and late-record drop are core to payments-platform engineering, and the Easy-tier curated problem is exactly this pattern. Get comfortable computing watermark = max(watermark, max_seen_event_time − allowed_lateness), picking a sensible allowed_lateness budget, and choosing among drop / side-output / update policies. The same primitives apply to fraud-signal pipelines, dispute-event processing, and webhook delivery.
How many Stripe practice problems should I solve before the interview?
Aim for 20–30 problems spanning all seven topic clusters above—not 100 of the same kind. Solve every problem in the Stripe-tagged practice set, then back-fill weak areas using the topic pages linked throughout this guide. Idempotency, bounded queues, and ETL with summaries are the topics worth over-investing in for Stripe specifically.
Start practicing Stripe data engineering problems
Reading patterns is not the same as typing them under time pressure. PipeCode pairs company-tagged Stripe problems with tests, AI feedback, and a coding environment so you can drill the exact Python and SQL patterns Stripe asks—without the noise of generic algorithm prep that doesn't apply to this loop.
Pipecode.ai is Leetcode for Data Engineering.



Top comments (0)