DEV Community

Cover image for Stripe Data Engineering Interview Questions
Gowtham Potureddi
Gowtham Potureddi

Posted on

Stripe Data Engineering Interview Questions

Stripe data engineering interview questions lean Python-heavy with a payments-platform edge: six Python primitives (sort + greedy for tiered shipping or fee schedules, hash-table aggregation for per-merchant transaction-fee rollups, set-based deduplication for idempotent event apply, collections.deque for bounded producer-consumer queues with back-pressure, event-time watermarks for late-record drop, and an end-to-end CSV ETL with validation and a logging summary) plus a single SQL primitive that tests JSONB introspection (top-level key extraction from event_payload with explicit type casting). The framings are payments-platform analytics—shipping fees, transaction fees, idempotency tokens, bounded ingestion buffers, event-time vs processing-time, ETL run logs, and JSONB event payloads.

This guide walks through the seven topic clusters Stripe actually tests, each with a detailed topic explanation, per-sub-topic explanation with a worked example and its solution, and an interview-style problem with a full solution that explains why it works. The mix matches the curated 7-problem Stripe set (1 easy, 5 medium, 1 hard)—a medium-heavy hub that rewards Python state-management fluency, streaming intuition, and one carefully scoped SQL JSONB query, rather than algorithm-puzzle Python.

Stripe data engineering interview questions cover image with bold headline, Python and SQL chips, and pipecode.ai attribution.


Top Stripe data engineering interview topics

From the Stripe data engineering practice set, the seven numbered sections below follow this topic map (one row per H2):

# Topic (sections 1–7) Why it shows up at Stripe
1 Python sorting and greedy for tiered pricing Shipping Cost Calculator with Tiered Pricing—sort tiers once, walk weight greedily, sum per-tier slabs.
2 Python hash tables for transaction-fee aggregation Transaction Fee Aggregation by Merchant—defaultdict(float) keyed by merchant_id, single-pass over rows.
3 Python sets for idempotent event apply Idempotent Event Apply with Token Cache—seen-set short-circuit so replays don't double-charge.
4 Python deques for bounded producer-consumer queues Bounded Producer-Consumer Queue—collections.deque(maxlen=N) with explicit back-pressure policy.
5 Python streaming with event-time watermarks Drop Late Records Past Watermark—watermark = max(event_time) − allowed_lateness, drop on event_time < watermark.
6 Python end-to-end ETL with validation and logging End-to-End ETL: Ingest, Validate, Load with Logging Summary—csv.DictReader → per-row validate → counters → final summary.
7 SQL JSONB filtering with type casting Extract Top-Level Keys from event_payload JSONB—jsonb_object_keys, ->>, and explicit :: casts.

Payments-platform framing rule: Stripe's prompts span payments-platform engineering—shipping fees, transaction fees, idempotency tokens, bounded ingestion buffers, event-time vs processing-time, ETL run logs, JSONB event payloads. The interviewer is grading whether you map each business framing to the right primitive: tiered fee schedule → sort + greedy walk; per-merchant rollup → hash-table aggregation; replay safety → seen-set + token cache; bounded buffer → deque(maxlen=N) + explicit overflow; late events → watermark filter; reliable ingest → ETL counters + summary log; flexible event payloads → JSONB introspection with explicit casts. State the mapping out loud.


1. Python Sorting and Greedy for Tiered Pricing

Sort-then-greedy walk for tiered pricing in Python for data engineering

Tiered pricing—shipping fees, payout fees, marketplace commissions—is the canonical "sort once, walk once" interview prompt. The mental model: sort the tiers by their lower bound, then walk an input weight (or amount) tier-by-tier and sum the per-tier slabs. The invariant is greedy: once you've consumed a tier's interval, you never come back to it. Three sub-skills carry the section: representing tiers as (threshold, rate) pairs, walking them in one pass, and being explicit about whether each threshold is inclusive or exclusive.

Pro tip: Watch units. A tiered shipping schedule mixes "first 1 kg = $5" with "each additional 0.5 kg = $0.50". Confusing flat-fee tiers with per-unit tiers is the #1 way candidates lose this question. Read the schedule out loud and underline whether each tier is flat-fee or per-unit.

Tier table: a list of (threshold, rate) intervals

A tier table is a sorted list of (upper_threshold, rate) pairs, optionally with a flat-fee column. The invariant: tier i covers the interval (threshold[i-1], threshold[i]]—the lower bound is the previous threshold, and the upper bound is threshold[i]. Sorted by upper_threshold ascending, you can walk weight greedily without scanning the entire table on every step.

  • tiers: list[tuple[float, float]][(threshold, rate), ...] sorted by threshold ascending.
  • Lower bound is implicit — tier i's lower bound is tiers[i-1].threshold (or 0 for the first tier).
  • Final tier is unbounded — represent it with threshold = math.inf so the walk terminates cleanly.
  • Worked example: shipping schedule with three tiers — up to 1 kg @ $5 flat, +up to 5 kg @ $1/kg, beyond @ $0.50/kg.
tier upper (kg) shape rate
1 1.0 flat $5.00
2 5.0 per-kg $1.00/kg
3 inf per-kg $0.50/kg
import math

# (upper_threshold_kg, shape, rate)
tiers = [
    (1.0,       "flat",   5.00),
    (5.0,       "per_kg", 1.00),
    (math.inf,  "per_kg", 0.50),
]
Enter fullscreen mode Exit fullscreen mode

Sort once, walk once: greedy interval consumption

The walk is one pass: at each tier, consume min(remaining_weight, tier_width) of the input, charge it at the tier rate, decrement the remaining weight, advance. The invariant: each tier is visited at most once, and the loop terminates as soon as remaining_weight == 0.

  • tier_width = upper - prev_upper — slab width for that tier (use math.inf only on the final tier).
  • consumed = min(remaining, tier_width) — never overshoot the tier; saturate with min.
  • Flat tiers add the rate as-is; per-unit tiers multiply consumed * rate.
  • Loop guard: if remaining <= 0: break—stops the walk early once the input is fully covered.
  • Worked example: ship a 4 kg package on the schedule above.
step tier width (kg) consumed (kg) charge remaining (kg)
1 1 (flat) 1.0 1.0 $5.00 3.0
2 2 (per_kg) 4.0 3.0 $3.00 0.0
3 (loop ends — remaining is 0)

Total: $8.00.

def shipping_cost(weight_kg: float, tiers: list[tuple[float, str, float]]) -> float:
    total, prev_upper, remaining = 0.0, 0.0, weight_kg
    for upper, shape, rate in tiers:
        if remaining <= 0:
            break
        width = upper - prev_upper
        consumed = min(remaining, width)
        if shape == "flat":
            total += rate
        else:
            total += consumed * rate
        remaining -= consumed
        prev_upper = upper
    return round(total, 2)
Enter fullscreen mode Exit fullscreen mode

Boundary handling: inclusive vs. exclusive thresholds

The most common bug is at the tier boundary. A schedule that says "up to 1 kg costs $5" usually means weight <= 1.0 falls in the first tier; "more than 1 kg up to 5 kg" means the second tier starts at strictly above 1 kg. The invariant: be consistent—pick one convention and apply it everywhere.

  • Half-open (prev, upper] — what the walk above implements; weight = 1.0 lands in tier 1, not tier 2.
  • Half-open [prev, upper) — alternative; weight = 1.0 would jump to tier 2's first kilogram.
  • Inclusive both sides — never use; ties become ambiguous.
  • Floating-point danger0.1 + 0.2 != 0.3; convert to Decimal or work in cents/grams for money/weight comparisons.
  • Worked example: at exactly 1 kg with the half-open (prev, upper] convention, the package is fully consumed in tier 1 (1.0 kg of 1.0 kg width), charged $5 flat, and the loop exits with remaining == 0.
from decimal import Decimal, getcontext

# When money is involved, switch to Decimal to avoid float drift.
getcontext().prec = 28
tiers_dec = [
    (Decimal("1.0"), "flat",   Decimal("5.00")),
    (Decimal("5.0"), "per_kg", Decimal("1.00")),
    (Decimal("Infinity"), "per_kg", Decimal("0.50")),
]
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Re-scanning the tier table for every kilogram of input—turns a single-pass O(K) walk into O(K·N).
  • Forgetting min(remaining, tier_width)—charges the full tier width even when the input ends mid-tier.
  • Not bounding the final tier with math.inf—loop terminates before the heaviest packages are charged.
  • Mixing flat-fee and per-unit tiers without a shape field—silently double-counts or skips charges.
  • Comparing money or weight as float—rounding bugs at exact boundaries; use Decimal or work in integer base units.

Python interview question on tiered shipping cost

Implement shipping_cost(weight_kg, tiers) for the schedule below:

  • 0–1 kg: $5 flat.
  • Over 1 kg, up to 5 kg: $1 per kg (charged on the portion in this tier only).
  • Over 5 kg: $0.50 per kg (on the portion above 5 kg).

Return the total in dollars rounded to 2 decimals. Tiers may not be passed in sorted order—sort them first.

Solution Using sort plus a greedy single-pass walk

import math
from typing import List, Tuple

Tier = Tuple[float, str, float]  # (upper_threshold, shape, rate)

def shipping_cost(weight_kg: float, tiers: List[Tier]) -> float:
    tiers = sorted(tiers, key=lambda t: t[0])
    total, prev_upper, remaining = 0.0, 0.0, weight_kg
    for upper, shape, rate in tiers:
        if remaining <= 0:
            break
        width = upper - prev_upper
        consumed = min(remaining, width)
        total += rate if shape == "flat" else consumed * rate
        remaining -= consumed
        prev_upper = upper
    return round(total, 2)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: weight_kg = 7.5, tiers as in the question):

weight_kg tiers (after sort)
7.5 [(1.0, "flat", 5.00), (5.0, "per_kg", 1.00), (inf, "per_kg", 0.50)]
  1. Sort tiers — already ascending by upper threshold; no reorder.
  2. Initializetotal = 0.0, prev_upper = 0.0, remaining = 7.5.
  3. Iteration 1 — tier (1.0, "flat", 5.00)width = 1.0 - 0.0 = 1.0; consumed = min(7.5, 1.0) = 1.0; flat shape, so total += 5.00 → 5.00; remaining = 7.5 - 1.0 = 6.5; prev_upper = 1.0.
  4. Iteration 2 — tier (5.0, "per_kg", 1.00)width = 5.0 - 1.0 = 4.0; consumed = min(6.5, 4.0) = 4.0; per-kg, so total += 4.0 * 1.00 = 4.00 → 9.00; remaining = 6.5 - 4.0 = 2.5; prev_upper = 5.0.
  5. Iteration 3 — tier (inf, "per_kg", 0.50)width = inf - 5.0 = inf; consumed = min(2.5, inf) = 2.5; per-kg, total += 2.5 * 0.50 = 1.25 → 10.25; remaining = 0.0.
  6. Next iteration checkremaining <= 0, loop breaks.
  7. Returnround(10.25, 2) = 10.25.

Output:

weight_kg total ($)
7.5 10.25

Why this works — concept by concept:

  • Sort by threshold — guarantees the greedy walk visits tiers in ascending order; without it, prev_upper would jump backward and width could go negative.
  • Half-open intervals (prev_upper, upper] — each tier owns a unique slab; no double-counting at the boundary.
  • min(remaining, tier_width) — saturates consumption at whichever runs out first—the input weight or the tier width—so the walk handles inputs that fall mid-tier as well as inputs that span all tiers.
  • Flat vs. per-kg shape field — explicit shape discriminates "tier rate is added once" from "tier rate is multiplied by the consumed portion", eliminating the most common tiered-pricing bug.
  • remaining <= 0 guard — short-circuits the loop on light packages so we don't iterate the unbounded final tier when there's nothing left to charge.
  • CostO(K log K) for the sort + O(K) for the walk, where K is the number of tiers; K is small in practice (≤10), so this is effectively constant time.

PYTHON
Topic — sorting
Sorting problems

Practice →

PYTHON
Topic — greedy
Greedy problems

Practice →


2. Python Hash Tables for Transaction-Fee Aggregation

Per-key rollups with defaultdict in Python for data engineering

Per-merchant transaction-fee rollups are the canonical "single-pass aggregation" interview prompt. The mental model: walk the rows once, key by merchant_id, accumulate fees. The invariant is idempotent on the key: dict[key] += amount is associative and commutative, so row order doesn't matter and partial reruns are safe. Three sub-skills carry the section: choosing the right dict flavor (defaultdict vs. Counter vs. plain dict), running the single-pass aggregation, and parsing the row before keying.

Pro tip: Reach for collections.defaultdict(float)—not Counter—when the values are floating-point money. Counter is integer-flavored (it has most_common, subtract, +/- operators that drop zero/negative entries) and will silently drop merchants whose net fees are zero.

defaultdict(float) as a per-key running total

collections.defaultdict(float) returns 0.0 for any missing key on first access, so d[key] += amount always works without an explicit if key not in d guard. The invariant: the default factory is called only on a missing-key read, not on a missing-key writed[key] = ... never invokes the factory.

  • defaultdict(float) — running total per key, default 0.0.
  • defaultdict(int) — running counter per key, default 0.
  • defaultdict(list) — append-per-key bucket, default [].
  • Counter — integer-only counter; has handy most_common(n) but loses negative deltas.
  • Worked example: three rows—(M1, 1.20), (M2, 0.80), (M1, 0.50)—keyed by merchant.
event dict state after
d["M1"] += 1.20 {"M1": 1.20}
d["M2"] += 0.80 {"M1": 1.20, "M2": 0.80}
d["M1"] += 0.50 {"M1": 1.70, "M2": 0.80}
from collections import defaultdict

fees = defaultdict(float)
for merchant, fee in [("M1", 1.20), ("M2", 0.80), ("M1", 0.50)]:
    fees[merchant] += fee
# defaultdict(<class 'float'>, {'M1': 1.70, 'M2': 0.80})
Enter fullscreen mode Exit fullscreen mode

Single-pass aggregation with dict[key] += amount

The aggregation is one pass: read each row, normalize the key, add the amount. The invariant: dict[key] += amount is O(1) average per row, so the whole pass is O(N) over N rows. Two passes over the same input never beats one—if the interviewer asks "what's the time complexity?" the answer is O(N).

  • Single-pass guarantee — no second loop for "now compute totals"; the totals are the dict.
  • Order independence+= is commutative and associative, so (a, b, c) and (c, a, b) produce the same dict.
  • Negative amounts — refunds work naturally; the running total can go down.
  • Final shape — convert to dict(d) if the consumer expects a plain dict (some serializers don't like defaultdict).
  • Worked example: five rows, three distinct merchants, one refund.
row merchant fee
1 M1 1.20
2 M2 0.80
3 M1 0.50
4 M3 2.00
5 M1 -0.40

After the single pass: {"M1": 1.30, "M2": 0.80, "M3": 2.00}.

from collections import defaultdict

def aggregate_fees(rows: list[tuple[str, float]]) -> dict[str, float]:
    totals = defaultdict(float)
    for merchant, fee in rows:
        totals[merchant] += fee
    return dict(totals)
Enter fullscreen mode Exit fullscreen mode

Parse-then-key: merchant_id, amount = line.split(",")

Real interview inputs arrive as text—CSV lines, log entries, or pasted dictionaries. The invariant: parse the row first, validate the parts, then key the dict. Mixing parse and aggregate in one tangled expression is the #1 way to ship bugs.

  • line.split(",") — naive CSV split; fails if merchant_id itself contains a comma.
  • csv.reader([line]) — handles quoting; the right tool for any non-trivial CSV.
  • float(amount_str) — convert before keying; never += a string.
  • merchant_id.strip() — handle whitespace from log scrape; "M1 " and "M1" should aggregate.
  • Worked example: the line "M1, 1.20"—naive split(",") gives ["M1", " 1.20"]; float(" 1.20") accepts the leading space, but int would not.
raw line parse step parsed
"M1, 1.20" parts = line.split(",") ["M1", " 1.20"]
merchant = parts[0].strip() "M1"
fee = float(parts[1]) 1.20
from collections import defaultdict

def aggregate_csv_lines(lines: list[str]) -> dict[str, float]:
    totals = defaultdict(float)
    for line in lines:
        if not line.strip() or line.startswith("#"):
            continue  # skip blanks and comments
        merchant_str, fee_str = line.split(",", maxsplit=1)
        totals[merchant_str.strip()] += float(fee_str)
    return dict(totals)
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Using Counter for floating-point fees—loses zero-net merchants and shows surprising behavior on negatives.
  • Forgetting defaultdict(float) and writing if k in d: d[k] += v else: d[k] = v—works, but verbose and slower.
  • Calling dict.setdefault(k, 0.0); d[k] += v—correct but allocates a fresh default every iteration; slower than defaultdict.
  • Splitting CSV lines with .split(",") when the merchant name itself can contain a comma—use the csv module.
  • Concatenating amounts as strings (d[k] += amount_str)—Python doesn't auto-cast; you'll get a TypeError once the key already has a float value.

Python interview question on transaction-fee aggregation

You receive a list of CSV strings, each formatted merchant_id,amount. Aggregate the fees per merchant_id and return a dict[str, float] with totals rounded to 2 decimals. Skip blank lines and lines starting with #. Whitespace around either field is allowed and must be stripped before keying.

Example input:

M1, 1.20
M2, 0.80
M1, 0.50
# comment, ignored
M3, 2.00
M1, -0.40
Enter fullscreen mode Exit fullscreen mode

Expected output: {"M1": 1.30, "M2": 0.80, "M3": 2.00}.

Solution Using defaultdict(float) and a single-pass walk

from collections import defaultdict

def aggregate_transaction_fees(lines: list[str]) -> dict[str, float]:
    totals: defaultdict[str, float] = defaultdict(float)
    for line in lines:
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        merchant_str, fee_str = line.split(",", maxsplit=1)
        totals[merchant_str.strip()] += float(fee_str)
    return {k: round(v, 2) for k, v in totals.items()}
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: the six lines from the question):

line# content parse dict state after
1 M1, 1.20 ("M1", 1.20) {M1: 1.20}
2 M2, 0.80 ("M2", 0.80) {M1: 1.20, M2: 0.80}
3 M1, 0.50 ("M1", 0.50) {M1: 1.70, M2: 0.80}
4 # comment... (skipped) (unchanged)
5 M3, 2.00 ("M3", 2.00) {M1: 1.70, M2: 0.80, M3: 2.00}
6 M1, -0.40 ("M1", -0.40) {M1: 1.30, M2: 0.80, M3: 2.00}
  1. Iterate lines — strip leading/trailing whitespace.
  2. Skip blanks and comments — the # comment... line is dropped before parsing.
  3. Parse with split(",", maxsplit=1) — guards against accidental commas in the amount; merchant id is the first field, the rest is the amount string.
  4. Strip merchant whitespace and float() the amount — defensive parsing.
  5. totals[merchant] += amountdefaultdict(float) provides 0.0 on first key access; subsequent reads accumulate.
  6. Round on the way out — final dict comprehension applies round(v, 2) once per key, not once per row.
  7. Return plain dictdict() (or the comprehension result) plays nicely with JSON serializers.

Output:

merchant_id total ($)
M1 1.30
M2 0.80
M3 2.00

Why this works — concept by concept:

  • defaultdict(float) — supplies 0.0 on the first read for any key, so the += write is always valid; eliminates the if k in d guard and the setdefault allocation.
  • Single-pass +=O(1) average per row, totaling O(N) over N rows; commutative + associative, so the order of input lines doesn't change the result.
  • Defensive parse: strip + split + float — catches whitespace from log scraping and rejects malformed amounts at parse time, before they corrupt the running total.
  • Comment / blank-line skip — preserves the contract that the input may include human-friendly markers (# header row); aggregating those would ValueError on float("# comment ...").
  • Round once, at the end — applying round per row accumulates rounding error; rounding the final total once yields stable cents.
  • CostO(N) time over N lines; O(K) space for K distinct merchants.

PYTHON
Topic — hash table
Hash table problems

Practice →

PYTHON
Topic — aggregation
Aggregation problems

Practice →


3. Python Sets for Idempotent Event Apply

Idempotency with a seen-set token cache in Python for data engineering

Idempotency is the canonical correctness invariant for any payments platform: applying the same event twice should produce the same end state as applying it once. The mental model: assign each event a unique idempotency token; before applying, check a seen-set; if the token is already present, skip; otherwise apply and record the token. The invariant is f(f(x)) == f(x). Three sub-skills carry the section: defining idempotency precisely, using a set (or LRU cache) as the dedup primitive, and bounding memory with eviction so the seen-set doesn't grow forever.

Pro tip: A seen-set is fine when the token universe is small enough to fit in memory. Production systems use a bounded cache (LRU via OrderedDict or functools.lru_cache-style primitives, or a Redis TTL set) so memory doesn't grow unbounded. Always state the bounded variant out loud, even if your minimal-loc solution uses set().

Idempotency: f(f(x)) == f(x)

An operation is idempotent when applying it twice gives the same result as applying it once. The invariant: state changes are conditional on a token check, not unconditional. A naive balance += amount is not idempotent—every duplicate doubles the change. A token-guarded if token not in seen: balance += amount; seen.add(token) is.

  • Network-safe — clients can retry an HTTP POST without fear of double-charging.
  • Restart-safe — a consumer that crashes mid-batch can replay from offset 0 without corrupting state.
  • Replay-safe — a downstream system can re-emit yesterday's events for backfill without duplicating effects.
  • Out-of-order-safe — applying the same event at two different consumers in different orders still converges.
  • Worked example: apply three events—(t1, +50), (t2, +30), (t1, +50)—the second t1 is a duplicate.
event token seen before? applied? balance after
1 t1 no yes 50
2 t2 no yes 80
3 t1 yes no 80
def apply_events(events: list[tuple[str, int]]) -> int:
    seen: set[str] = set()
    balance = 0
    for token, delta in events:
        if token in seen:
            continue
        balance += delta
        seen.add(token)
    return balance
Enter fullscreen mode Exit fullscreen mode

Seen-set short-circuit: skip if token in seen

The dedup primitive is a set of tokens. The invariant: O(1) average lookup and insert, both backed by hashing. The short-circuit pattern is if token in seen: return early—no balance update, no log write, no downstream effect.

  • token in seenO(1) average; uses the token's __hash__ and __eq__.
  • seen.add(token)O(1) average; idempotent for the set itself (adding the same value twice is a no-op).
  • Membership-only set — when you only need "yes/no", don't use a dict; sets are smaller and faster.
  • Hashable tokens — use str UUIDs, integer event IDs, or tuples; never mutable types like list.
  • Worked example: four events—(a, +10), (b, +5), (a, +10), (c, +3)—starting balance 0.
event token check action balance
1 a a in seen → False apply +10 10
2 b b in seen → False apply +5 15
3 a a in seen → True skip 15
4 c c in seen → False apply +3 18
def apply_with_seen(events: list[tuple[str, int]]) -> int:
    seen, balance = set(), 0
    for token, delta in events:
        if token in seen:
            continue
        balance += delta
        seen.add(token)
    return balance
Enter fullscreen mode Exit fullscreen mode

Bounded-cache eviction with collections.OrderedDict

Production token caches don't grow forever—they evict the oldest entries when the cache size exceeds a configured limit. The invariant: most-recent tokens stay in cache; least-recent are evicted in FIFO or LRU order. collections.OrderedDict gives you move_to_end + popitem(last=False) for clean LRU semantics.

  • OrderedDict — preserves insertion order; popitem(last=False) removes the oldest in O(1).
  • move_to_end(key) — promotes a key to most-recent on read; converts FIFO to LRU.
  • maxsize — the cap; evict when len(cache) > maxsize.
  • TTL-based caches — Redis with EXPIRE; the same idea, time-bounded instead of size-bounded.
  • Worked example: maxsize = 3 with token sequence a, b, c, d, a.
step inserted cache (oldest → newest)
1 a [a]
2 b [a, b]
3 c [a, b, c]
4 d [b, c, d] (a evicted)
5 a [c, d, a] (b evicted; a treated as new)
from collections import OrderedDict

class BoundedTokenCache:
    def __init__(self, maxsize: int) -> None:
        self.maxsize = maxsize
        self.cache: OrderedDict[str, None] = OrderedDict()

    def __contains__(self, token: str) -> bool:
        if token in self.cache:
            self.cache.move_to_end(token)  # LRU bump on read
            return True
        return False

    def add(self, token: str) -> None:
        self.cache[token] = None
        if len(self.cache) > self.maxsize:
            self.cache.popitem(last=False)
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Using list.append + if token in listO(N) lookup; degrades to O(N²) over the whole stream.
  • Forgetting to seen.add(token) after applying—the very next replay re-applies and the balance doubles.
  • Storing the event payload in the seen-set instead of a small token—blows memory; tokens should be short strings or integers.
  • Letting the seen-set grow unbounded in production—works in interviews; OOM-kills the consumer in real systems.
  • Mutable tokens (lists, dicts)TypeError: unhashable type; always use str, int, or tuple.

Python interview question on idempotent event apply

Implement apply_events(events, balance=0, maxsize=10_000) that walks a list of (token, delta) events and returns the final integer balance. Each token may appear multiple times; the second and later occurrences must be skipped. Use a bounded LRU token cache (maxsize tokens) so memory stays bounded; an evicted token may legitimately re-apply if it returns later (this is the documented trade-off). Return the final balance.

Solution Using a seen-set with an OrderedDict LRU cache

from collections import OrderedDict

def apply_events(
    events: list[tuple[str, int]],
    balance: int = 0,
    maxsize: int = 10_000,
) -> int:
    seen: OrderedDict[str, None] = OrderedDict()
    for token, delta in events:
        if token in seen:
            seen.move_to_end(token)  # LRU bump
            continue
        balance += delta
        seen[token] = None
        if len(seen) > maxsize:
            seen.popitem(last=False)  # evict LRU
    return balance
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: events = [("a", 10), ("b", 5), ("a", 10), ("c", 3), ("b", 5)], maxsize = 3):

step event check action balance cache (oldest → newest)
1 (a,10) a in seen → False apply 10 [a]
2 (b,5) b in seen → False apply 15 [a, b]
3 (a,10) a in seen → True skip + LRU bump 15 [b, a]
4 (c,3) c in seen → False apply 18 [b, a, c]
5 (b,5) b in seen → True skip + LRU bump 18 [a, c, b]
  1. Initializeseen empty OrderedDict, balance = 0.
  2. Iteration 1 (a, 10)a not in cache; balance becomes 10; insert a.
  3. Iteration 2 (b, 5)b not in cache; balance becomes 15; insert b.
  4. Iteration 3 (a, 10)a in cache → skip; move_to_end("a") promotes it to most-recent.
  5. Iteration 4 (c, 3)c not in cache; balance becomes 18; insert c. Cache size is 3, still within maxsize.
  6. Iteration 5 (b, 5)b in cache → skip; move_to_end("b") promotes it.
  7. Eviction never triggers in this trace because the cache stayed at maxsize = 3; if a fourth distinct token arrived, the LRU entry would be evicted.

Output:

final_balance
18

Why this works — concept by concept:

  • Idempotency invariant f(f(x)) == f(x) — guarded by a token check, not by the operation itself; replays of the same token leave the balance unchanged.
  • OrderedDict as a set + LRU__contains__ is O(1) average; move_to_end and popitem(last=False) give clean LRU semantics in O(1).
  • LRU bump on readmove_to_end after a positive in check converts FIFO to LRU; tokens that keep arriving stay hot and aren't evicted.
  • Bounded memoryif len(seen) > maxsize: popitem(last=False) caps memory at maxsize entries regardless of stream length; the evicted-and-re-arrives trade-off is documented.
  • Order independence on the kept tokens — once a token is in the cache, every subsequent replay is a no-op, so input order does not affect the final balance among the in-cache tokens.
  • CostO(N) time for N events; O(maxsize) space for the bounded cache.

PYTHON
Topic — set
Set problems

Practice →

PYTHON
Topic — streaming
Streaming problems

Practice →

Drill more streaming idempotency problems →


4. Python Deques for Bounded Producer-Consumer Queues

Bounded queues with collections.deque(maxlen=N) in Python for data engineering

Bounded producer-consumer queues are the canonical "ingest with back-pressure" interview prompt. The mental model: producers append to one end of a fixed-capacity queue; consumers pop from the other end; when full, the producer must block, drop, or overwrite—never silently grow. The invariant is len(queue) <= maxlen always. Three sub-skills carry the section: choosing collections.deque(maxlen=N) over list, declaring the back-pressure policy explicitly, and respecting the single-producer / single-consumer simplification when one is offered.

Pro tip: Stripe and other payments platforms grade you not just on "does it work" but on what happens when it's full. Always state the back-pressure policy out loud—block, drop-newest, drop-oldest, or overflow-into-disk—before typing the first line.

Diagram showing a producer pushing into a bounded deque, the queue at capacity, and back-pressure options labelled block, drop-newest, and drop-oldest.

collections.deque(maxlen=N) for O(1) ends

collections.deque is a doubly-linked deque with O(1) append, appendleft, pop, and popleft. The maxlen parameter caps capacity. The invariant: once the deque is at capacity, every append discards the oldest item automatically. That makes deque(maxlen=N) the one-line drop-oldest queue.

  • d.append(x) — push right, O(1); if full, oldest is silently popped from the left.
  • d.popleft() — pop left (FIFO consume), O(1).
  • d.appendleft(x) — push left, O(1) (rare in producer-consumer; useful for un-pop).
  • d.maxlen — read-only int (or None) once constructed; you cannot resize a deque in place.
  • Worked example: deque(maxlen=3) with the sequence 1, 2, 3, 4.
step op state
1 append(1) [1]
2 append(2) [1, 2]
3 append(3) [1, 2, 3]
4 append(4) [2, 3, 4] (1 evicted)
from collections import deque

q = deque(maxlen=3)
for x in (1, 2, 3, 4):
    q.append(x)
print(list(q))  # [2, 3, 4]
Enter fullscreen mode Exit fullscreen mode

Back-pressure: drop, block, or overwrite-oldest

The interesting design decision is what happens at capacity. deque(maxlen=N)'s default is drop-oldest—convenient but not always correct. The invariant: back-pressure policy is a contract with the producer, and the four common variants behave very differently under load.

  • drop-oldestdeque(maxlen=N).append(x) semantics; convenient but loses the oldest event, which is often the most important for an audit trail.
  • drop-newest — explicit if len(q) >= N: return; preserves the oldest events.
  • blockqueue.Queue(maxsize=N).put(x) blocks the producer until a slot frees; back-pressure propagates upstream.
  • overflow-to-disk — spill to an append-only log when full; preserves everything at the cost of latency.
  • Worked example: capacity 2, sequence a, b, c—four policies, four outcomes.
policy after append('c') dropped producer behavior
drop-oldest [b, c] a non-blocking
drop-newest [a, b] c non-blocking
block [a, b] (until consumer popleft) none producer blocks
spill-to-disk [a, b] + c to log none (delayed) non-blocking
from collections import deque

class DropNewestQueue:
    def __init__(self, maxlen: int) -> None:
        self.q: deque = deque()
        self.maxlen = maxlen
        self.dropped = 0

    def put(self, item) -> bool:
        if len(self.q) >= self.maxlen:
            self.dropped += 1
            return False
        self.q.append(item)
        return True

    def get(self):
        return self.q.popleft() if self.q else None
Enter fullscreen mode Exit fullscreen mode

Single-producer/single-consumer invariant

Many interview prompts add the simplification "single producer, single consumer" (SPSC). The invariant: without concurrency, no lock or threading.Event is required. SPSC + deque is one of the few cases where the GIL is enough; multi-producer or multi-consumer needs queue.Queue, asyncio.Queue, or an explicit lock.

  • SPSC, single threaddeque is fine; no synchronization needed.
  • MPMC, multi-threadqueue.Queue (thread-safe, blocking put/get).
  • Async coroutinesasyncio.Queue with await q.put(...) / await q.get().
  • Cross-processmultiprocessing.Queue with serialization overhead.
  • Worked example: SPSC append/popleft sequence with the drop-newest queue from above.
step producer op consumer op state notes
1 put('a') [a] OK
2 put('b') [a, b] full
3 put('c') [a, b] dropped
4 get() → 'a' [b] drain
5 put('c') [b, c] OK now
from collections import deque

# SPSC: a single-threaded producer-consumer loop using a bounded deque.
q = deque(maxlen=2)
producer_inputs = ['a', 'b', 'c']
for x in producer_inputs:
    q.append(x)  # drop-oldest semantics
# After the loop: q is ['b', 'c']; 'a' was evicted on the third append.
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Using list with pop(0)O(N) per pop, turns the queue into a quadratic bottleneck.
  • Forgetting to declare the back-pressure policy—silently relying on deque(maxlen)'s drop-oldest, which is rarely what you want for audit-critical events.
  • Resizing a deque after construction—deque.maxlen is read-only; you have to allocate a new one and migrate.
  • Using deque(maxlen=N) from multiple threads without a lock—append and popleft are individually thread-safe, but compound operations (if not full: put) are not.
  • Storing payloads inline that would blow the memory budget—queue items should be small (IDs, references), not full event payloads, when capacity is large.

Python interview question on a bounded producer-consumer queue

Implement BoundedQueue(maxlen, overflow="drop_newest") with three methods:

  • put(item) -> bool — push to the right; return True if accepted, False if dropped.
  • get() -> item | None — pop from the left; return None if empty.
  • __len__() -> int — current size.

overflow must support "drop_oldest", "drop_newest", and "block_no_op" (return False without dropping the existing data). Track and expose a .dropped counter for visibility. Single-producer / single-consumer; no thread synchronization required.

Solution Using collections.deque with an explicit overflow policy

from collections import deque
from typing import Any, Optional

class BoundedQueue:
    def __init__(self, maxlen: int, overflow: str = "drop_newest") -> None:
        if overflow not in {"drop_oldest", "drop_newest", "block_no_op"}:
            raise ValueError(f"unknown overflow policy: {overflow}")
        self._q: deque = deque()
        self.maxlen = maxlen
        self.overflow = overflow
        self.dropped = 0

    def put(self, item: Any) -> bool:
        if len(self._q) < self.maxlen:
            self._q.append(item)
            return True
        # at capacity
        if self.overflow == "drop_oldest":
            self._q.popleft()
            self._q.append(item)
            self.dropped += 1
            return True
        if self.overflow == "drop_newest":
            self.dropped += 1
            return False
        # block_no_op
        self.dropped += 1
        return False

    def get(self) -> Optional[Any]:
        return self._q.popleft() if self._q else None

    def __len__(self) -> int:
        return len(self._q)
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: BoundedQueue(maxlen=2, overflow="drop_newest") with calls put('a'), put('b'), put('c'), get(), put('c')):

step call state before branch taken state after dropped return
1 put('a') [] len < maxlen → append ['a'] 0 True
2 put('b') ['a'] len < maxlen → append ['a','b'] 0 True
3 put('c') ['a','b'] full + drop_newest ['a','b'] 1 False
4 get() ['a','b'] non-empty → popleft ['b'] 1 'a'
5 put('c') ['b'] len < maxlen → append ['b','c'] 1 True
  1. Constructiondeque empty, maxlen=2, overflow="drop_newest", dropped=0.
  2. Step 1–2 — both put calls succeed; queue grows to ['a', 'b'].
  3. Step 3 — at capacity, policy is drop_newest, so 'c' is rejected, counter increments, return is False.
  4. Step 4 — consumer drains one slot; 'a' returned, queue is ['b'].
  5. Step 5 — capacity available again; 'c' accepted, queue is ['b', 'c'].
  6. Auditq.dropped == 1 and len(q) == 2; the consumer can decide whether to alert based on the dropped counter.

Output:

call return
put('a') True
put('b') True
put('c') False
get() 'a'
put('c') True
q.dropped 1

Why this works — concept by concept:

  • collections.deque for O(1) ends — both append and popleft are constant-time; list.pop(0) would be O(N) and ruin throughput.
  • Explicit overflow policydrop_oldest / drop_newest / block_no_op are different contracts with the producer; the constructor validates the value to fail fast on typos.
  • Drop counters as observability.dropped lets the consumer or a metrics scraper alert on back-pressure without changing the queue's hot path.
  • SPSC simplification — single producer / single consumer means no locks; deque's append and popleft are atomic w.r.t. each other under the GIL for the SPSC pattern.
  • Boolean return on put — the producer learns immediately whether the item was accepted, so it can degrade gracefully (sample, log, or cancel upstream).
  • CostO(1) per put and per get; O(maxlen) space.

PYTHON
Topic — queue
Queue problems

Practice →

PYTHON
Topic — streaming
Streaming problems

Practice →


5. Python Streaming with Event-Time Watermarks

Event-time watermarks and late-record drop in Python for data engineering

Streaming correctness lives or dies on the distinction between event time (when something happened) and processing time (when your system saw it). The mental model: events arrive out of order; you maintain a watermark = max(event_time) − allowed_lateness and drop anything with event_time < watermark. The invariant: once the watermark advances past a timestamp, that timestamp is "closed" and contributing events are policy-dropped (or routed to a side output). Three sub-skills carry the section: defining the two clocks, computing the watermark, and choosing what to do with late records.

Pro tip: State the allowed-lateness budget out loud. "I'm choosing 60 seconds of allowed lateness because the upstream Kafka cluster is configured to retry for up to 30 seconds, and I want a 2× safety margin." Interviewers grade the justification more than the value.

Diagram showing out-of-order events arriving along a processing-time axis with a watermark line and late events past the line marked as dropped.

Event time vs. processing time

Event time is the timestamp embedded in the event payload (when the user's tap happened). Processing time is time.time() inside your consumer (when your code is about to handle it). The invariant: event time is monotonic per producer but not globally; processing time is monotonic per process but unrelated to the real world.

  • Event time — drives correctness; "what happened at 09:55?".
  • Processing time — drives latency; "how long until I saw the 09:55 event?".
  • Out-of-order — events with event_time = 09:55 can arrive at processing_time = 10:02 because of network lag.
  • Skew — clock skew across producer machines; pin to a single trusted timestamp source if possible.
  • Worked example: six events arriving at six processing-time ticks, with their event-time timestamps shown below.
processing_time event_id event_time
10:00:00 e1 10:00:00
10:01:00 e2 10:01:00
10:02:00 e3 10:02:00
10:02:30 e4 09:58:00
10:03:00 e5 10:03:00
10:03:30 e6 09:55:00

e4 and e6 are late—their event time is older than recently seen event times.

Watermark = max(event_time) - allowed_lateness

A watermark is a monotonic timestamp that asserts "I won't accept events with event_time < watermark". The simplest form: watermark = max_observed_event_time − allowed_lateness, advanced after every event. The invariant: the watermark only ever moves forward; never backward.

  • max_seen_event_time — running max over all observed events.
  • allowed_lateness — slack budget; tunable per pipeline.
  • Monotonicwatermark = max(watermark, max_seen_event_time − allowed_lateness); never reset on the way down.
  • Per-key watermarks — for fairness, sometimes maintained per partition (per merchant_id).
  • Worked example: apply allowed_lateness = 2 minutes to the 6-event stream above.
step event_time max_seen watermark before watermark after drop?
1 10:00:00 10:00:00 -inf 09:58:00 no
2 10:01:00 10:01:00 09:58:00 09:59:00 no
3 10:02:00 10:02:00 09:59:00 10:00:00 no
4 09:58:00 10:02:00 10:00:00 10:00:00 yes (09:58:00 < 10:00:00)
5 10:03:00 10:03:00 10:00:00 10:01:00 no
6 09:55:00 10:03:00 10:01:00 10:01:00 yes (09:55:00 < 10:01:00)
class Watermark:
    def __init__(self, allowed_lateness_s: float) -> None:
        self.allowed = allowed_lateness_s
        self.max_seen = float("-inf")
        self.value = float("-inf")

    def observe(self, event_time_s: float) -> None:
        if event_time_s > self.max_seen:
            self.max_seen = event_time_s
        # monotonic: never rewind
        self.value = max(self.value, self.max_seen - self.allowed)
Enter fullscreen mode Exit fullscreen mode

Drop, side-output, or update — late-data policy

Three policies are common: drop (cheapest, lossy), side-output (route to a separate sink for offline reconciliation), and update (re-open the closed window and patch downstream aggregates). The invariant: the policy is a business decision, not a coding decision. State it explicitly.

  • Drop — ignore late records; metrics show dropped_count for alerting.
  • Side-output — route late records to a Kafka DLQ or S3 path; reconcile in a batch job.
  • Update — most expensive; requires idempotent downstream state and an "update event" channel.
  • Tombstone — for "event was wrong, retract it" semantics; common in CDC pipelines.
  • Worked example: apply the drop policy to the 6-event stream; final accepted set is {e1, e2, e3, e5}, late count is 2.
def filter_by_watermark(
    events: list[tuple[str, float]],
    allowed_lateness_s: float,
) -> tuple[list[str], int]:
    wm = Watermark(allowed_lateness_s)
    accepted, dropped = [], 0
    for event_id, event_time in events:
        wm.observe(event_time)
        if event_time < wm.value:
            dropped += 1
            continue
        accepted.append(event_id)
    return accepted, dropped
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Treating processing_time as event_time—loses correctness as soon as one event is late.
  • Letting the watermark go backward when a late event arrives—watermark = max_seen − allowed without the max(watermark, ...) guard.
  • Dropping the late event and updating max_seen from it—pollutes the watermark with stale values.
  • Forgetting per-key watermarks when one merchant is much slower than the rest—globally lagging watermark blocks fast merchants.
  • Setting allowed_lateness = 0—rejects every out-of-order event, including ones that are only milliseconds late.

Python interview question on event-time watermark drop

Implement drop_late_records(events, allowed_lateness_s) that takes a list of (event_id, event_time_s) tuples (event_time as Unix seconds), maintains a monotonic watermark with the policy watermark = max(watermark, max_seen_event_time − allowed_lateness_s), and returns (accepted_ids, dropped_count). An event is dropped if event_time_s < watermark after the watermark is updated for the current event.

Solution Using a monotonic watermark with explicit drop policy

from typing import List, Tuple

def drop_late_records(
    events: List[Tuple[str, float]],
    allowed_lateness_s: float,
) -> Tuple[List[str], int]:
    accepted: list[str] = []
    dropped = 0
    max_seen = float("-inf")
    watermark = float("-inf")
    for event_id, event_time in events:
        if event_time > max_seen:
            max_seen = event_time
        # monotonic: never rewind
        watermark = max(watermark, max_seen - allowed_lateness_s)
        if event_time < watermark:
            dropped += 1
            continue
        accepted.append(event_id)
    return accepted, dropped
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: the six events from the worked example, allowed_lateness_s = 120):

step event_id event_time max_seen watermark event < wm? result
1 e1 10:00:00 10:00:00 09:58:00 no accept
2 e2 10:01:00 10:01:00 09:59:00 no accept
3 e3 10:02:00 10:02:00 10:00:00 no accept
4 e4 09:58:00 10:02:00 10:00:00 yes drop
5 e5 10:03:00 10:03:00 10:01:00 no accept
6 e6 09:55:00 10:03:00 10:01:00 yes drop
  1. Initializemax_seen = -inf, watermark = -inf, accepted = [], dropped = 0.
  2. Steps 1–3 — every event sets a new max_seen and advances the watermark by exactly 60 seconds (because the events themselves arrive 60 seconds apart in event-time).
  3. Step 4 (e4) — event time 09:58:00 does not update max_seen (still 10:02:00); watermark stays at 10:00:00 from the previous tick; the event check 09:58:00 < 10:00:00 is true → drop.
  4. Step 5 (e5) — event time 10:03:00 advances max_seen; watermark advances to 10:01:00.
  5. Step 6 (e6) — event time 09:55:00 doesn't move max_seen; watermark stays at 10:01:00; check 09:55:00 < 10:01:00 is true → drop.
  6. Return(["e1", "e2", "e3", "e5"], 2).

Output:

accepted_ids dropped
["e1", "e2", "e3", "e5"] 2

Why this works — concept by concept:

  • Event time vs. processing time — the filter is decided by the embedded event_time, not by clock-on-the-wall arrival; this is the reason watermarks exist.
  • Monotonic watermarkwatermark = max(watermark, max_seen - allowed_lateness_s) guarantees the watermark never rewinds; downstream stages can safely close windows.
  • Allowed-lateness slack — the 120-second budget absorbs minor out-of-order arrivals; events more than 2 minutes late are dropped, alerted on, or side-outputted.
  • Late record does not move max_seen — line if event_time > max_seen: max_seen = event_time ignores stale events, so the watermark isn't dragged backward.
  • Single explicit drop policydropped += 1; continue is readable, observable, and easy to swap out for a side-output if the business requirement changes.
  • CostO(N) time over N events; O(K) space for the K accepted events.

PYTHON
Topic — streaming
Streaming problems

Practice →

PYTHON
Topic — filtering
Filtering problems

Practice →

Practice streaming-filter problems →


6. Python End-to-End ETL with Validation and Logging

CSV ingest, per-row validate, and a logging summary in Python for data engineering

End-to-end ETL is the canonical Hard-tier prompt: read a CSV, validate per row, load valid rows somewhere, log a summary at the end. The mental model is three lanes—ingest, validate, load—plus counters that survive the full run. The invariant: no malformed row crashes the pipeline; bad rows are counted, logged, and quarantined. Three sub-skills carry the section: ingest with csv.DictReader, per-row validation with explicit error counters, and an end-of-run summary that's both human-readable and machine-parseable.

Pro tip: The Hard-tier ETL prompts grade you on observability, not just correctness. Always return a structured summary—{rows_seen, rows_loaded, rows_dropped, errors_by_type}. Interviewers want to see that you build pipelines you can debug at 3 AM.

CSV ingest with csv.DictReader

csv.DictReader returns each row as a dict[str, str] keyed by the header. The invariant: DictReader handles quoting, embedded commas, and CRLF for you—line.split(",") does not. Always reach for csv over manual splitting on any CSV that comes from the wild.

  • csv.DictReader(file) — first row becomes the header; each row is a dict.
  • row[col] — string by default; cast explicitly to int/float/datetime.
  • Encoding — open with encoding="utf-8-sig" to swallow the BOM that Excel exports include.
  • newline="" — open files with newline="" so csv controls line termination cross-platform.
  • Worked example: a 3-row CSV with a header.
line content
1 merchant_id,fee
2 M1,1.20
3 M2,0.80
4 M3,2.00
import csv

with open("fees.csv", encoding="utf-8-sig", newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row)  # e.g., {"merchant_id": "M1", "fee": "1.20"}
Enter fullscreen mode Exit fullscreen mode

Per-row validation and error counters

Validation runs per row. The invariant: a row either passes all checks (loaded) or fails at least one (counted by category). Counters by error category make the post-mortem fast: "we dropped 137 rows, 130 of which had bad_amount."

  • Counter for error categorieserrors["bad_amount"] += 1.
  • Pure-function validator — returns (ok, error_kind) tuples; no side effects.
  • Skip-and-continue — never raise on a bad row inside the loop; aggregate, then decide.
  • Defensive casttry: float(row["fee"]) except ValueError: ....
  • Worked example: four rows, one missing field, one bad amount.
row content result
1 {merchant_id: M1, fee: 1.20} ok
2 {merchant_id: M2, fee: } error: bad_amount
3 {merchant_id: , fee: 2.00} error: missing_merchant
4 {merchant_id: M3, fee: 2.00} ok
from collections import Counter

def validate(row: dict) -> tuple[bool, str | None]:
    if not (row.get("merchant_id") or "").strip():
        return False, "missing_merchant"
    fee_str = (row.get("fee") or "").strip()
    if not fee_str:
        return False, "bad_amount"
    try:
        fee = float(fee_str)
    except ValueError:
        return False, "bad_amount"
    if fee < 0:
        return False, "negative_amount"
    return True, None
Enter fullscreen mode Exit fullscreen mode

Aggregated logging summary at end-of-run

The end-of-run summary is the deliverable. The invariant: the summary is structured (a dict) and exhaustive (every counter accounted for). Don't print ad-hoc lines; return the dict and let the caller decide whether to log, push to Datadog, or POST to a webhook.

  • {rows_seen, rows_loaded, rows_dropped, errors_by_type} — minimum viable summary.
  • Counter.most_common(3) — surface top error categories for human eyeballs.
  • elapsed_s — wall-clock duration; helps spot slow runs.
  • JSON-serializable — dicts of primitives only; downstream alerting expects to json.dumps it.
  • Worked example: a 4-row run with 2 ok and 2 dropped.
import json

summary = {
    "rows_seen": 4,
    "rows_loaded": 2,
    "rows_dropped": 2,
    "errors_by_type": {"missing_merchant": 1, "bad_amount": 1},
    "elapsed_s": 0.012,
}
print(json.dumps(summary, indent=2))
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Letting one bad row crash the whole pipeline—batch ETL must aggregate and continue.
  • Writing the summary as ad-hoc print lines instead of a dict—not greppable, not machine-parseable.
  • Validating after loading—now your destination has bad data and you're playing cleanup instead of triage.
  • Forgetting encoding="utf-8-sig"—Excel exports start with a BOM that mangles the first column header.
  • Using line.split(",") instead of csv.DictReader—breaks the moment a merchant name contains a quoted comma.

Python interview question on end-to-end ETL with logging summary

Implement run_etl(rows) that:

  1. Iterates a list of dict rows (you can think of these as already coming out of csv.DictReader).
  2. Validates each row: merchant_id non-empty (after strip), fee parseable as float, fee >= 0.
  3. Loads the valid rows by appending them (with fee as a float) to an output list.
  4. Returns (loaded, summary) where summary is a dict with keys rows_seen, rows_loaded, rows_dropped, and errors_by_type (a dict of error category → count).

Solution Using a per-row validator and a Counter for error categories

from collections import Counter
from typing import Any

def _validate(row: dict[str, Any]) -> tuple[bool, str | None, float | None]:
    merchant = (row.get("merchant_id") or "").strip()
    if not merchant:
        return False, "missing_merchant", None
    fee_str = str(row.get("fee", "")).strip()
    if not fee_str:
        return False, "bad_amount", None
    try:
        fee = float(fee_str)
    except ValueError:
        return False, "bad_amount", None
    if fee < 0:
        return False, "negative_amount", None
    return True, None, fee

def run_etl(rows: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    loaded: list[dict[str, Any]] = []
    errors: Counter[str] = Counter()
    seen = 0
    for row in rows:
        seen += 1
        ok, kind, fee = _validate(row)
        if not ok:
            errors[kind] += 1
            continue
        loaded.append({"merchant_id": row["merchant_id"].strip(), "fee": fee})
    summary = {
        "rows_seen": seen,
        "rows_loaded": len(loaded),
        "rows_dropped": sum(errors.values()),
        "errors_by_type": dict(errors),
    }
    return loaded, summary
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: five rows below):

row content validator returns
1 {merchant_id: "M1", fee: "1.20"} (True, None, 1.20)
2 {merchant_id: " ", fee: "0.80"} (False, "missing_merchant", None)
3 {merchant_id: "M2", fee: ""} (False, "bad_amount", None)
4 {merchant_id: "M3", fee: "abc"} (False, "bad_amount", None)
5 {merchant_id: "M4", fee: "-0.10"} (False, "negative_amount", None)
  1. Initializeloaded = [], errors = Counter(), seen = 0.
  2. Row 1 — passes all checks; append {"merchant_id": "M1", "fee": 1.20}; seen = 1.
  3. Row 2 — empty merchant after strip; errors["missing_merchant"] += 1; seen = 2.
  4. Row 3 — empty fee string; errors["bad_amount"] += 1; seen = 3.
  5. Row 4float("abc") raises ValueError; counted as bad_amount; seen = 4.
  6. Row 5 — fee is negative; errors["negative_amount"] += 1; seen = 5.
  7. Build summaryrows_seen = 5, rows_loaded = 1, rows_dropped = 4, errors_by_type = {"missing_merchant": 1, "bad_amount": 2, "negative_amount": 1}.
  8. Return — the loaded list (one entry) and the structured summary dict.

Output:

key value
rows_seen 5
rows_loaded 1
rows_dropped 4
errors_by_type.missing_merchant 1
errors_by_type.bad_amount 2
errors_by_type.negative_amount 1

Why this works — concept by concept:

  • Pure validator_validate(row) returns (ok, kind, fee) with no side effects; trivially unit-testable in isolation.
  • Counter for error categoriesCounter keeps errors_by_type exhaustive and lets most_common(N) surface the top failure modes for alerting.
  • Skip-and-continue — invalid rows increment a counter and continue; the pipeline never throws on bad data, which is the whole point of batch ETL.
  • Structured summary — the returned dict is JSON-serializable; the caller can json.dumps it for logs, push to Datadog, or POST to a webhook without further processing.
  • Separation of concerns_validate decides correctness; run_etl decides bookkeeping; the load step is a single loaded.append(...). Each lane changes independently.
  • CostO(N) time over N rows; O(K) space for K distinct error categories plus the loaded output list.

PYTHON
Topic — ETL
ETL problems

Practice →

PYTHON
Topic — CSV processing
CSV processing problems

Practice →

See more ETL problems →


7. SQL JSONB Filtering with Type Casting

Top-level key extraction from JSONB columns in SQL for data engineering

JSONB is the workhorse for flexible event payloads in payments platforms—webhook bodies, dispute metadata, fraud-signal blobs. The mental model: a JSONB column is a binary, indexed, deduped dictionary; you query it with ->, ->>, and helper functions like jsonb_object_keys. The invariant: -> returns JSONB; ->> returns text; both require explicit casts before numeric or temporal predicates. Three sub-skills carry the section: choosing JSONB over JSON, extracting top-level keys, and casting safely with ::int, ::numeric, and ::timestamptz.

Pro tip: For "extract the top-level keys of a JSONB column", reach for jsonb_object_keys(col) (set-returning) or jsonb_each(col) (key + value tuples). Don't try to roll your own with regex on the text representation—you will lose to the database planner and to JSONB's binary deduplication.

JSONB vs. JSON: binary, indexable, deduped keys

json is a text type; jsonb is a binary, normalized type. The invariant: JSONB drops duplicate keys, normalizes whitespace, and supports GIN indexes; JSON preserves the source bytes verbatim. For analytics, JSONB is almost always the right choice.

  • jsonb — binary, deduped keys, GIN-indexable, slightly slower to insert.
  • json — text, preserves bytes, no deduplication, faster insert / no GIN.
  • -> — extract value as JSONB; chainable (payload -> 'a' -> 'b').
  • ->> — extract value as text; use this when you need a SQL string or a target for ::cast.
  • Worked example: the same payload stored as jsonb deduplicates the repeated amount key.
input JSON stored as jsonb
{"amount": 10, "amount": 20, "merchant": "M1"} {"amount": 20, "merchant": "M1"}
-- JSON preserves duplicates as raw text:
SELECT '{"amount": 10, "amount": 20}'::json;
-- returns: {"amount": 10, "amount": 20}

-- JSONB deduplicates and keeps the last value:
SELECT '{"amount": 10, "amount": 20}'::jsonb;
-- returns: {"amount": 20}
Enter fullscreen mode Exit fullscreen mode

jsonb_object_keys(event_payload) for top-level keys

jsonb_object_keys(jsonb) is a set-returning function: it returns one row per top-level key. The invariant: only top-level keys—nested objects' keys are not flattened. Pair it with DISTINCT to dedupe across rows.

  • jsonb_object_keys(col) — one text row per top-level key.
  • Set-returning, so wrap it in a subquery or LATERAL-join into the parent table.
  • Top-level only — for a fully recursive walk, use jsonb_path_query(col, '$.**').
  • Pair with DISTINCT — when the goal is "the universe of keys ever observed".
  • Worked example: three event payloads, three different shapes.
event_id event_payload (JSONB)
1 {"amount": 10, "merchant": "M1"}
2 {"amount": 20, "merchant": "M2", "currency": "USD"}
3 {"merchant": "M3", "currency": "USD"}

jsonb_object_keys over each row produces:

event_id key
1 amount
1 merchant
2 amount
2 merchant
2 currency
3 merchant
3 currency

SELECT DISTINCT key over that subquery yields {amount, merchant, currency}.

SELECT DISTINCT jsonb_object_keys(event_payload) AS key
FROM events;
Enter fullscreen mode Exit fullscreen mode

->> extraction + ::int / ::timestamptz casts

->> returns a text. Almost every interesting predicate needs an explicit cast: (payload ->> 'amount')::numeric > 100, (payload ->> 'created_at')::timestamptz < now() - interval '1 day'. The invariant: cast first, predicate second—Postgres won't auto-cast text in arithmetic comparisons.

  • (col ->> 'k')::int — text → int; raises on bad data.
  • (col ->> 'k')::numeric — text → numeric; safer for fees / amounts.
  • (col ->> 'k')::timestamptz — text → timestamp with TZ; standard for event times.
  • (col -> 'k') — JSONB → JSONB; use this if the value is itself an object.
  • COALESCE for missing keysCOALESCE((col ->> 'k')::numeric, 0).
  • Worked example: filter to events where amount > 15 from the table above.
event_id event_payload ->> 'amount' cast to numeric passes > 15?
1 '10' 10 no
2 '20' 20 yes
3 NULL NULL NULL → no
SELECT event_id
FROM events
WHERE (event_payload ->> 'amount')::numeric > 15;
-- Returns event_id = 2.
Enter fullscreen mode Exit fullscreen mode

Common beginner mistakes

  • Using -> where ->> was needed—(col -> 'amount')::numeric fails because the input is JSONB, not text.
  • Skipping COALESCE when the key is optional—missing keys return NULL and break aggregates that don't tolerate NULL.
  • Using json when jsonb is available—loses GIN, loses deduplication, loses query speed.
  • Casting (col ->> 'amount')::int on a text that contains 1.20::int rejects decimals; use ::numeric or ::float.
  • Trying to query nested keys with jsonb_object_keys alone—jsonb_object_keys is top-level only; chain with -> 'parent' or use jsonb_path_query for deep walks.

SQL interview question on extracting top-level keys from JSONB

Given events(event_id INT, event_payload JSONB), return the distinct set of top-level keys ever seen across all event_payload values, sorted alphabetically. Output a single column key of type text. Empty objects ('{}'::jsonb) contribute zero rows; missing keys never appear.

Solution Using jsonb_object_keys with DISTINCT and ORDER BY

SELECT DISTINCT key
FROM events,
     LATERAL jsonb_object_keys(event_payload) AS key
ORDER BY key;
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace (input: the three-row table from the worked example, plus a fourth row with '{}'::jsonb):

event_id event_payload
1 {"amount": 10, "merchant": "M1"}
2 {"amount": 20, "merchant": "M2", "currency": "USD"}
3 {"merchant": "M3", "currency": "USD"}
4 {}
  1. FROM events, LATERAL jsonb_object_keys(...) — for each row in events, the lateral function emits one row per top-level key.
  2. Row 1 — emits (1, "amount"), (1, "merchant").
  3. Row 2 — emits (2, "amount"), (2, "merchant"), (2, "currency").
  4. Row 3 — emits (3, "merchant"), (3, "currency").
  5. Row 4 ({}) — emits zero rows (empty object has no keys); the row is dropped from the output.
  6. SELECT DISTINCT key — collapses duplicates: {amount, merchant, currency}.
  7. ORDER BY key — alphabetical: amount, currency, merchant.

Output:

key
amount
currency
merchant

Why this works — concept by concept:

  • JSONB binary representation — keys are deduplicated and normalized at insert time; we don't have to worry about "amount" vs. " amount" or duplicate keys.
  • jsonb_object_keys is set-returning — emits one row per top-level key; LATERAL joins it into each input row, expanding the universe of (event_id, key) pairs.
  • LATERAL join — required because jsonb_object_keys references a column from the outer table (events.event_payload); a plain CROSS JOIN wouldn't have visibility.
  • DISTINCT key — collapses to the universe of keys; equivalent to GROUP BY key, but DISTINCT is idiomatic for set-extraction queries.
  • Empty objects contribute zero rowsjsonb_object_keys('{}'::jsonb) is the empty set; the corresponding event silently drops out, which is the desired semantics.
  • CostO(N · K) where N is the row count and K is average keys per row, plus the sort; for a typical events table with 1–2 dozen keys, this is comfortably linear and easily indexable with a GIN index on event_payload.

SQL
Topic — type casting
Type casting problems

Practice →

SQL
Topic — filtering
Filtering problems

Practice →


Tips to crack Stripe data engineering interviews

These are habits that move the needle in real Stripe loops—not a re-statement of the topics above.

Python preparation

Stripe tilts Python-heavy. Drill these patterns until they're muscle memory:

  • defaultdict and Counter — single-pass aggregation by key (transaction-fee rollups).
  • set and OrderedDict for dedup — idempotent event apply; LRU bump on read; popitem(last=False) to evict.
  • collections.deque(maxlen=N) — bounded queues with explicit overflow policy (drop-newest, drop-oldest, block).
  • Sort + greedy walk — tiered pricing schedules with min(remaining, tier_width) saturation.
  • Watermark filtersmax(watermark, max_seen − allowed_lateness); never let it rewind.
  • ETL with structured summaries{rows_seen, rows_loaded, rows_dropped, errors_by_type}; return the dict, don't print it.

Topic-page drills: hash table, set, queue, streaming, ETL.

SQL preparation

Stripe's single SQL question on the curated set is JSONB introspection with type-casting—very payments-platform flavored:

  • jsonb_object_keys and jsonb_each — top-level key extraction; pair with LATERAL and DISTINCT.
  • -> vs ->> and explicit ::cast — cast first, predicate second; ::numeric over ::int for money.
  • COALESCE for missing keysCOALESCE((p ->> 'k')::numeric, 0) keeps aggregates honest.
  • GIN indexes on JSONBCREATE INDEX ON events USING gin(event_payload) for @> containment queries.
  • jsonb_path_query for nested walks — when keys are deep; otherwise -> chains.

Entry points: type casting, filtering, Stripe Python practice.

Hard-tier ETL fluency

Stripe's only Hard-tier curated problem is the end-to-end ETL with logging summary. Get fluent with the family:

  • csv.DictReader — handles quoting and embedded commas; always with encoding="utf-8-sig" and newline="".
  • Pure validators — return (ok, kind, value); no side effects; trivial to unit-test.
  • Counter for error categorieserrors[kind] += 1; surface most_common(3) in the summary.
  • Structured summary dict{rows_seen, rows_loaded, rows_dropped, errors_by_type, elapsed_s}; JSON-serializable for downstream alerting.
  • Skip-and-continue — never raise on a bad row inside the loop; aggregate, then decide.

End-to-end ETL is the highest-ROI Hard-tier prep target for Stripe-style loops.

Payments-platform domain framing

Stripe's prompts are payments-platform flavored: shipping fees (tiered pricing), transaction fees (per-merchant rollups), idempotency tokens (replay safety), bounded buffers (back-pressure), event-time vs processing-time (out-of-order arrivals), ETL run logs (observability), JSONB event payloads (flexible schemas). State the mapping out loud: "this is a tiered fee schedule via sort + greedy"; "this is a per-merchant rollup via defaultdict"; "this is replay-safety via a seen-set + token cache"; "this is a deque(maxlen=N) with drop-newest"; "this is a watermark filter"; "this is csv.DictReader + per-row validation + a summary dict"; "this is jsonb_object_keys with a LATERAL join."

Where to practice on PipeCode

Communication under time pressure

Three things to say out loud before, during, and after each problem — in this order:

  • Assumptionsbefore typing. "I'll assume events are at-least-once delivered, so I need an idempotency token"; "I'll assume the tier table fits in memory and is small (≤10 entries)"; "I'll assume event_payload is jsonb, not json."
  • Invariantsafter key code blocks. "watermark is monotonic—I never let it rewind"; "deque(maxlen=N) drops the oldest by default"; "jsonb_object_keys is top-level only."
  • Complexityat the end. "Single-pass O(N) over the events"; "tier walk is O(K) where K ≤ 10"; "the JSONB query is O(N · K) with a GIN index keeping containment fast."

Interviewers grade clear reasoning above silent-and-perfect.


Frequently Asked Questions

What is the Stripe data engineering interview process like?

The Stripe data engineering interview typically includes a phone screen (mostly Python warm-up around aggregation, dedup, or basic streaming), one or two coding rounds focused on Python data-engineering primitives (idempotency, bounded queues, watermarks, ETL), at least one round with SQL JSONB introspection, plus a system-design conversation around payments-data pipelines and behavioral interviews. The curated 7-problem Stripe practice set on PipeCode mirrors what you will see on the technical rounds.

What Python topics does Stripe test for data engineers?

Stripe emphasizes Python data-engineering primitives, not algorithm puzzles: hash-table aggregation (defaultdict, Counter), set-based deduplication for idempotency, collections.deque(maxlen=N) for bounded producer-consumer queues, event-time watermarks for streaming, and end-to-end CSV ETL with validation and a logging summary. Drill these on the hash table, set, queue, streaming, and ETL topic pages.

How important is SQL for a Stripe data engineering interview?

SQL is roughly 15–20% of the technical interview at Stripe—much lighter than at SQL-heavy shops, but the SQL you do see is payments-platform-specific: JSONB introspection (jsonb_object_keys, ->>, ::cast) over flexible event payloads. The type casting and filtering topic pages plus the curated Stripe set are the right drilling targets.

How hard are Stripe data engineering interview questions?

Stripe's curated set has 1 easy + 5 medium + 1 hard = a medium-heavy hub. The Hard-tier problem is the end-to-end ETL with logging summary—an exercise in observability, not algorithmic cleverness. Most candidates underestimate idempotency and watermarks; those are where Stripe's loops separate "knows Python" from "ships payments-grade Python."

Are streaming and watermark questions common in Stripe interviews?

Yes—out-of-order arrivals and late-record drop are core to payments-platform engineering, and the Easy-tier curated problem is exactly this pattern. Get comfortable computing watermark = max(watermark, max_seen_event_time − allowed_lateness), picking a sensible allowed_lateness budget, and choosing among drop / side-output / update policies. The same primitives apply to fraud-signal pipelines, dispute-event processing, and webhook delivery.

How many Stripe practice problems should I solve before the interview?

Aim for 20–30 problems spanning all seven topic clusters above—not 100 of the same kind. Solve every problem in the Stripe-tagged practice set, then back-fill weak areas using the topic pages linked throughout this guide. Idempotency, bounded queues, and ETL with summaries are the topics worth over-investing in for Stripe specifically.


Start practicing Stripe data engineering problems

Reading patterns is not the same as typing them under time pressure. PipeCode pairs company-tagged Stripe problems with tests, AI feedback, and a coding environment so you can drill the exact Python and SQL patterns Stripe asks—without the noise of generic algorithm prep that doesn't apply to this loop.

Pipecode.ai is Leetcode for Data Engineering.

Browse Stripe practice →
View all practice →

Top comments (0)