DEV Community

Cover image for Kafka Connect Deep Dive: Source, Sink, SMTs, Schema Registry & Idempotent Writes
Gowtham Potureddi
Gowtham Potureddi

Posted on

Kafka Connect Deep Dive: Source, Sink, SMTs, Schema Registry & Idempotent Writes

kafka connect looks like "just another Kafka client" to a junior engineer — senior engineers know it is actually a full distributed framework that replaces an entire category of bespoke producer-and-consumer code with a few JSON configs. The result is the most under-appreciated lever in any modern ingestion stack: a runtime that turns "write a custom MySQL → S3 pipeline in 800 lines of Java" into "POST a 30-line connector config and watch four parallel tasks ship change data in seconds."

This guide is the deep dive you wanted the first time a kafka connect tutorial glossed over Debezium, sink semantics, or schema evolution and left you to fight production alone. It walks through the cluster anatomy (workers, tasks, connectors, REST API), the kafka connect source vs kafka connect sink contract, debezium log-based CDC, the kafka connect smt chain (ExtractField, Cast, RegexRouter, MaskField, ReplaceField), the schema registry compatibility ladder for Avro / Protobuf / JSON Schema, and the kafka connect idempotent producer plus exactly-once delivery semantics. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.

PipeCode blog header for a Kafka Connect deep dive — bold white headline 'Kafka Connect Deep Dive' with subtitle 'Source · Sink · SMT · Schema Registry' and a stylised plug-and-socket connector scene with Debezium / JDBC / S3 chips on a dark gradient with purple, green, orange, and blue accents and a small pipecode.ai attribution.

When you want hands-on reps immediately after reading, drill the streaming practice library →, rehearse on ETL pipeline problems →, and stack the streaming-in-Python muscles with streaming Python drills →.


On this page


1. Why Kafka Connect exists — declarative ingestion

kafka connect replaces the "two custom apps per pipeline" tax with a config-driven framework that handles parallelism, offsets, and restarts for you

The one-sentence invariant: Kafka Connect is a distributed framework for moving data between Kafka and other systems by configuration, not by code — every common source (databases, files, message queues) and sink (warehouses, blob stores, search indexes) has a reusable connector that you instantiate with JSON instead of writing a producer or consumer from scratch. Once you internalise that "Connect is what stands between every external system and Kafka," every line of bespoke ingestion code becomes a candidate for deletion.

The three places hand-rolled ingestion bites.

  • Producers. A custom producer needs partition strategy, key serialisation, retry policy, batching, and back-pressure handling. Forget any one of them and you ship duplicates, hot partitions, or 4 AM pages.
  • Consumers. A custom consumer needs offset management, rebalance handling, idempotent writes, schema parsing, and dead-letter queues. Skip any one and the next deploy drops or replays messages.
  • Operations. Both apps need their own deployment, monitoring, scaling, and on-call rotation — for every pipeline.

Connect collapses all of that into a single runtime in one sentence.

A Kafka Connect cluster is a pool of JVM workers that run connectors; each connector is a JSON config that names a connector class (e.g. io.debezium.connector.mysql.MySqlConnector) plus its options (host, topic, credentials, transforms); the worker shards the connector into tasks and distributes them across the cluster — and you never write a producer or consumer line again.

What interviewers listen for.

  • Do you say "Connect is declarative ingestion, not a Kafka client library" when asked what it is? — senior signal.
  • Do you reach for a connector first when the question is "how do I move data from X to Kafka?" — required answer.
  • Do you mention distributed mode plus the REST API as the production stance? — senior signal.
  • Do you recognise that Connect is the same framework for both directions (source and sink) — required answer.

The 2026 reality.

  • Debezium is still the default log-based CDC source for MySQL, Postgres, MongoDB, SQL Server, and Oracle.
  • Confluent maintains a large catalogue of certified connectors (S3, JDBC, Snowflake, BigQuery, Elasticsearch); independent maintainers ship many more.
  • Schema Registry is the de-facto wire-format hub for Avro / Protobuf / JSON Schema and is integrated end-to-end with Connect.
  • Connect's idempotent producer has been the default since 3.0; transactional sinks now make exactly-once delivery practical for the first time.

Worked example — the JDBC source hello-world in one config

Detailed explanation. Before going deep on Debezium, the cheapest demonstration of Connect's "declarative ingestion" value proposition is a JDBC source pulling a Postgres orders table into Kafka with incrementing-PK polling. The whole pipeline is a single REST POST.

Question. Given a Postgres database with an orders(id BIGSERIAL, customer_id INT, amount NUMERIC, created_at TIMESTAMPTZ) table, write the JSON connector config that ships every new row to a Kafka topic named pg.orders with five parallel tasks. Show that no producer code is needed.

Input.

connector config field value
name pg-orders-source
connector.class io.confluent.connect.jdbc.JdbcSourceConnector
connection.url jdbc:postgresql://pg:5432/shop
table.whitelist orders
mode incrementing
incrementing.column.name id
topic.prefix pg.
tasks.max 5

Code.

{
  "name": "pg-orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://pg:5432/shop",
    "connection.user": "kafka_reader",
    "connection.password": "${file:/secrets/pg.properties:pwd}",
    "table.whitelist": "orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "pg.",
    "tasks.max": "5",
    "poll.interval.ms": "5000",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}
Enter fullscreen mode Exit fullscreen mode
# Submit the connector via the Connect REST API
curl -X POST -H "Content-Type: application/json" \
     --data @pg-orders-source.json \
     http://connect:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The config names the connector (pg-orders-source) and points the framework at the JdbcSourceConnector class shipped in the JDBC plugin. No custom Java is loaded.
  2. mode = incrementing plus incrementing.column.name = id tells the connector to remember the last id it saw and only emit rows with a strictly greater id on the next poll. The "last id" is persisted in the connect-offsets topic.
  3. topic.prefix = pg. produces a topic name pg.orders per source table. The connector creates the topic on first publish (if auto.create.topics.enable is on) or you pre-create it with explicit partition count.
  4. tasks.max = 5 is the requested parallelism. The framework asks the connector to split its work — JDBC source can split by table, so a single-table connector caps at one task. Multi-table connectors fan out up to tasks.max.
  5. Submitting the config to POST /connectors is the entire deployment. The Connect worker validates the config, persists it in connect-configs, and assigns tasks to workers.

Output.

Step Producer code written Consumer code written Tasks running
0 — submit config 0 lines 0 lines 0
1 — connector starts 0 lines 0 lines 1 (single table)
2 — rows arrive 0 lines 0 lines 1
3 — downstream consumer reads pg.orders 0 lines usual consumer code 1

Rule of thumb. Every time a teammate proposes "let's write a small producer to push X into Kafka," check whether a connector exists first. Connect's worst-case is a 100-line custom connector class; its best case is a 30-line JSON file that ships the same day.

Worked example — sink hello-world: S3 Parquet from a Kafka topic

Detailed explanation. The mirror image of the source hello-world is the sink: take a Kafka topic and ship every record into S3 as date-partitioned Parquet files. Like the source, the whole pipeline is one connector config — no consumer code.

Question. Given a Kafka topic pg.orders (Avro-encoded), write the S3 sink connector that lands records into s3://lake/raw/orders/yyyy=2026/mm=06/dd=12/... Parquet, flushed every 1,000 records or 5 minutes.

Input.

sink config field value
topics pg.orders
s3.bucket.name lake
s3.region us-east-1
format.class io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class TimeBasedPartitioner
path.format 'yyyy'=YYYY/'mm'=MM/'dd'=dd
flush.size 1000
rotate.interval.ms 300000

Code.

{
  "name": "s3-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "pg.orders",
    "s3.bucket.name": "lake",
    "s3.region": "us-east-1",
    "topics.dir": "raw",
    "flush.size": "1000",
    "rotate.interval.ms": "300000",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'yyyy'=YYYY/'mm'=MM/'dd'=dd",
    "partition.duration.ms": "86400000",
    "timestamp.extractor": "Record",
    "tasks.max": "4",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. topics = pg.orders tells the sink connector which Kafka topic(s) to subscribe to. Multiple topics can be listed comma-separated.
  2. format.class selects the wire format on the S3 side — Parquet is the analytics-friendly default; Avro and JSON are also supported.
  3. partitioner.class = TimeBasedPartitioner + path.format produces a Hive-style date-partitioned key prefix. Athena, Spark, and Snowflake all auto-detect this layout for partition pruning.
  4. flush.size = 1000 and rotate.interval.ms = 300000 define the dual flush triggers — whichever fires first closes the current file and uploads it to S3.
  5. tasks.max = 4 distributes the four topic partitions across four workers. Each task owns a subset of partitions and writes its own files, so files are always partition-local — no cross-task contention.

Output.

Trigger Files produced Average size Partition path
flush.size hit at 09:12 1 ~6 MB yyyy=2026/mm=06/dd=12/
rotate timer at 09:17 1 ~1.4 MB yyyy=2026/mm=06/dd=12/
flush.size hit at 09:23 1 ~6 MB yyyy=2026/mm=06/dd=12/
End of day 96 files ~5 MB avg yyyy=2026/mm=06/dd=12/

Rule of thumb. Sink connectors are usually quieter than sources because they only need to fight the external system's write semantics. The S3 sink writes whole files; the JDBC sink batches inserts; the Elasticsearch sink upserts by document id. Pick the sink whose config maps cleanly to the destination's write contract.

Worked example — standalone mode vs distributed mode

Detailed explanation. Connect runs in two modes: standalone (one process, one config file, no cluster, no REST API) and distributed (a pool of workers sharing config in a Kafka internal topic, controlled via REST). Standalone is the laptop / one-machine integration-test mode; distributed is the only mode you ever run in production.

Question. Given a production CDC pipeline that needs to survive worker crashes, scale tasks horizontally, and accept config changes via API, which Connect mode is correct — standalone or distributed — and why?

Input.

Requirement Standalone Distributed
Survives worker crash no yes (tasks rebalance)
Scale tasks across machines no yes
Config via REST API no yes
Config in a file yes no — in connect-configs topic
HA no yes

Code.

# Standalone — single process, config from a file
connect-standalone.sh \
    config/connect-standalone.properties \
    config/jdbc-source.properties

# Distributed — N workers, same group.id, same internal topics
connect-distributed.sh config/connect-distributed.properties
# (run on each node; workers find each other via group.id)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Standalone reads connector configs from a properties file at startup. There is no API to add or update connectors at runtime — restart the process to change config.
  2. Distributed mode uses the group.id to form a worker pool: every worker with the same group.id joins the same Connect cluster and shares the three internal topics (connect-configs, connect-offsets, connect-statuses).
  3. In distributed mode, the REST API on any worker accepts POST /connectors and PUT /connectors/:name/config — the change propagates through connect-configs so every worker sees it.
  4. When a worker crashes in distributed mode, the cluster rebalances and surviving workers pick up the failed worker's tasks. Standalone has no failover — the pipeline stops.
  5. Tasks in standalone are bounded by one process; distributed scales tasks horizontally across machines until tasks.max is reached.

Output.

Mode When to use
Standalone Dev laptop, integration tests, single-process demos
Distributed Every production pipeline — always

Rule of thumb. Treat standalone mode as "the CLI tool for learning Connect" — never run it in production. The day after you launch a standalone connector to production, a worker restart will silently lose your pipeline.

Kafka interview question on Connect's design intent

A senior interviewer often opens with: "Walk me through what Kafka Connect is, why it exists, and how it differs from writing a producer and consumer pair for the same job. When would you not use it?" It blends design-intent reasoning with a practical "when do you reach for Connect" answer.

Solution Using Connect's declarative + distributed framing

Kafka Connect is a distributed framework for moving data between Kafka and
other systems via configuration — JSON connector configs sent to a REST
API, not custom producer / consumer Java code.

It exists because every team that hand-rolled ingestion ended up rewriting
the same five concerns: partition strategy, offset management, retries,
schema handling, and operational scaling. Connect captures all five inside
the framework, leaves the "what to do per record" to a reusable Connector
class, and exposes the "how to instantiate" surface as JSON.

I use Connect when:
  • The source or sink has an off-the-shelf connector (Debezium, JDBC, S3,
    Elasticsearch, Snowflake, BigQuery, …).
  • The pipeline is logically "table → topic" or "topic → store" without
    multi-record state.
  • Operations (HA, rebalance, restart, monitoring) matter as much as the
    code that processes the records.

I do NOT use Connect when:
  • The transform is stateful or multi-record — that's Kafka Streams or
    ksqlDB territory, not SMT territory.
  • The source emits a fundamentally non-Kafka-like stream (e.g. WebSockets
    where each event needs custom auth per message).
  • The team only has one tiny one-shot ingestion and wants to avoid running
    a Connect cluster — a small custom producer might be lighter.
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Decision step Standalone Java producer Kafka Connect
1. Wire up partition strategy ~50 lines 0 lines (config)
2. Offset tracking ~120 lines + DB 0 lines (connect-offsets topic)
3. Idempotent writes ~80 lines 0 lines (built-in producer)
4. Schema serialisation ~60 lines 0 lines (Schema Registry converter)
5. Operational HA custom orchestration distributed mode (built-in)
Total LOC ~310+ ops glue ~30 JSON + zero ops glue

After the comparison, the architect chooses Connect for every "table → topic" or "topic → store" pipeline and keeps custom Java only for the two or three pipelines where the source / sink has no usable connector and the volume justifies the maintenance cost.

Output:

Pipeline type Recommended stack
MySQL CDC → Kafka → S3 Parquet Connect (Debezium source + S3 sink)
Postgres bulk → Kafka Connect (JDBC source, bulk mode)
Topic → Snowflake Connect (Snowflake sink)
Stateful join across 3 topics Kafka Streams / Flink, not Connect
One-shot CSV → Kafka, ad-hoc small custom producer

Why this works — concept by concept:

  • Declarative ingestion — Connect captures the cross-cutting concerns (offsets, retries, parallelism, schema) inside the framework and asks the operator only "what to move and how to authenticate." Configuration replaces code.
  • Distributed workers — multiple JVMs in the same group.id form a cluster. Failover is automatic, scaling is horizontal, and the REST API is uniform across all workers.
  • Reusable connectors — every connector class implements the same Connector / Task interface; once written, it serves every team forever. The ecosystem amortises engineering cost across the industry.
  • SMTs at the boundary — Single Message Transforms handle "reshape this record" without leaving Connect. Multi-record state escapes upward to Streams / Flink, by design.
  • Cost — Connect adds one JVM cluster to your stack. The savings: dozens of hand-rolled ingestion apps that would otherwise need their own deployment, monitoring, and on-call rotation. The break-even is usually two non-trivial pipelines.

Streaming
Topic — streaming
Streaming pipeline problems (Kafka)

Practice →


2. Connect cluster anatomy — workers, tasks, REST API

kafka connect is four layers — Worker (JVM), Connector (logical), Task (parallel), REST API (control) — and three internal topics keep state durable

The mental model in one line: a Connect cluster is a pool of Worker JVMs that share three Kafka internal topics; each Worker hosts Connector instances; each Connector spawns up to tasks.max Tasks that do the actual record-by-record work; the REST API is the only control plane. Once you can recite "Workers run, Connectors own, Tasks parallelise, REST controls," the entire operational surface of Connect resolves into the right mental boxes.

Visual anatomy of a Kafka Connect distributed cluster — three Worker JVM boxes in a row, each holding multiple Task slots; a Connector logical object spanning across all three workers and dispatching tasks; a REST API badge sending POST /connectors to the cluster; three internal Kafka topics (connect-configs, connect-offsets, connect-statuses) underneath; on a light PipeCode card.

The four layers in one table.

Layer What it is Lifetime Lives where
Worker A JVM process running the Connect runtime machine lifetime each node in distributed mode
Connector A logical instance of a connector class until deleted via REST persisted in connect-configs
Task A parallel unit of work for a Connector until Connector deleted or rebalanced runs inside a Worker
REST API Control plane (CRUD over connectors) worker lifetime port 8083 on every Worker

Three internal topics — the durable state.

  • connect-configs — compacted topic that stores every connector config. Workers replay it on startup so they all agree on the cluster's connectors.
  • connect-offsets — compacted topic that stores source-connector offsets (the "where did I leave off" pointer). Sink-connector offsets live in the regular consumer-group offsets topic.
  • connect-statuses — compacted topic with the current state (RUNNING, FAILED, PAUSED) of every task. The REST API reads it on GET /status.

Tasks vs Workers — the parallelism story.

  • Connector specifies tasks.max — the framework asks "how many tasks can split this work?" The connector answers with a list of partitions / shards.
  • The Worker pool decides assignment — tasks are distributed across workers via a rebalance protocol. Each worker runs a slice.
  • Connect 2.3+ cooperative rebalance — only impacted tasks pause during a rebalance, rather than the old "stop the world" stop-the-cluster behaviour.
  • One task per partition (rule of thumb) — exceeding partition_count on a single-topic sink wastes tasks; matching partition_count saturates parallelism.

REST API — the only control plane you should script against.

  • POST /connectors — submit a new connector config.
  • GET /connectors/:name/status — current state of the connector and each task.
  • PUT /connectors/:name/config — update config (causes a restart).
  • POST /connectors/:name/restart — restart a failed connector.
  • POST /connectors/:name/tasks/:id/restart — restart a single failed task.
  • DELETE /connectors/:name — delete a connector (tasks stop; offsets remain in their topic).

Common interview probes on the cluster model.

  • "What happens when a worker dies?" — the cluster rebalances; surviving workers pick up the dead worker's tasks. The internal topics keep config and offsets durable.
  • "Can two workers run the same task?" — no. Each task is owned by exactly one worker at a time. Rebalance moves ownership.
  • "Where do source-connector offsets live?" — in the connect-offsets topic, keyed by (connector-name, source-partition). Sink offsets live in the standard consumer group offsets.
  • "What's the difference between tasks.max and partitions?" — tasks.max is the upper bound you give the framework; partitions (of the source / sink topic) is the natural bound. Effective tasks = min(tasks.max, splittable-units).

Worked example — submitting a connector via REST

Detailed explanation. Every production connector starts life as a JSON file submitted to the REST API. The framework validates it synchronously and returns the persisted config; failure modes (bad class name, missing required field) come back as 4xx with a JSON error body.

Question. Submit a Debezium MySQL connector named mysql-shop-cdc with tasks.max = 1 (Debezium is single-task per database) and verify it is RUNNING.

Input.

field value
name mysql-shop-cdc
connector.class io.debezium.connector.mysql.MySqlConnector
database.hostname mysql
database.port 3306
database.user debezium
database.server.name shop
table.include.list shop.orders, shop.line_items
tasks.max 1

Code.

# 1) Submit the connector
curl -X POST -H "Content-Type: application/json" \
     --data '{
       "name": "mysql-shop-cdc",
       "config": {
         "connector.class": "io.debezium.connector.mysql.MySqlConnector",
         "database.hostname": "mysql",
         "database.port": "3306",
         "database.user": "debezium",
         "database.password": "${file:/secrets/dbz.properties:pwd}",
         "database.server.name": "shop",
         "database.include.list": "shop",
         "table.include.list": "shop.orders,shop.line_items",
         "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
         "schema.history.internal.kafka.topic": "schema-history.shop",
         "tasks.max": "1"
       }
     }' \
     http://connect:8083/connectors

# 2) Check status
curl http://connect:8083/connectors/mysql-shop-cdc/status

# 3) List all connectors in the cluster
curl http://connect:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. POST /connectors validates the JSON against the connector class's config definition. Missing required fields (e.g. forgetting database.hostname) return 400 with the offending field name.
  2. The framework persists the config to connect-configs. Every worker replays that topic on startup, so the connector survives the cluster being fully restarted.
  3. The cluster assigns one task to one worker (Debezium is single-task per database). The worker starts the task, which opens a binlog reader and begins emitting events.
  4. GET /connectors/:name/status returns connector: { state: RUNNING } plus tasks: [ { id, state, worker_id } ]. If a task is FAILED, the JSON includes a stack trace.
  5. GET /connectors returns every connector in the cluster as a JSON array. Useful for inventory scripts and dashboards.

Output.

Step Response Meaning
1 — POST 201 Created + config echo persisted in connect-configs
2 — GET status { connector: { state: RUNNING }, tasks: [{ id: 0, state: RUNNING }] } one task running on a worker
3 — list ["mysql-shop-cdc", "s3-orders-sink", "pg-orders-source"] three connectors in the cluster

Rule of thumb. Store every connector config in version control as a .json file alongside the application code that depends on it. CI/CD then PUTs the latest config to the cluster on merge — the same pattern you would use for Kubernetes manifests.

Worked example — scaling tasks for a multi-table Debezium connector

Detailed explanation. Debezium is single-task per database — a constraint that surprises engineers used to scaling consumers by partition count. The way to parallelise CDC is not "increase tasks.max"; it is "split tables across multiple Debezium connectors" or use the signal-based parallelism helpers shipped in newer Debezium versions.

Question. A Postgres database has 12 large tables and CDC throughput is saturating a single Debezium task. Show how to fan out to four parallel connectors, one per table cluster, and how tasks.max and topic.prefix are configured.

Input.

table group tables
Group A orders, order_items
Group B customers, addresses
Group C inventory, sku_prices
Group D events_clickstream (high-volume)

Code.

[
  {
    "name": "pg-cdc-group-a",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "pg",
      "database.server.name": "shop_a",
      "table.include.list": "public.orders,public.order_items",
      "topic.prefix": "cdc.a",
      "slot.name": "dbz_slot_a",
      "publication.name": "dbz_pub_a",
      "tasks.max": "1"
    }
  },
  {
    "name": "pg-cdc-group-b",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.server.name": "shop_b",
      "table.include.list": "public.customers,public.addresses",
      "topic.prefix": "cdc.b",
      "slot.name": "dbz_slot_b",
      "publication.name": "dbz_pub_b",
      "tasks.max": "1"
    }
  }
]
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. tasks.max stays at 1 per connector — Debezium reads the WAL serially and cannot fan out a single replication slot across multiple tasks.
  2. The fan-out happens at the connector layer: four separate Debezium connectors share the cluster but each owns a distinct table set.
  3. Each connector needs its own slot.name (Postgres) or database.server.name (MySQL); otherwise two connectors would compete for the same replication state and one would fail.
  4. The topic.prefix keeps each group's topics distinct (cdc.a.public.orders, cdc.b.public.customers). Downstream sinks subscribe to the regex.
  5. CPU and Postgres write-ahead-log slots are now the binding constraints. Each Debezium connector keeps one replication slot open and one consumer thread per task.

Output.

Connector Tables Tasks Replication slot
pg-cdc-group-a orders, order_items 1 dbz_slot_a
pg-cdc-group-b customers, addresses 1 dbz_slot_b
pg-cdc-group-c inventory, sku_prices 1 dbz_slot_c
pg-cdc-group-d events_clickstream 1 dbz_slot_d
Cluster total 12 tables 4 tasks 4 slots

Rule of thumb. When CDC throughput is the bottleneck, scale by partitioning tables across connectors — not by raising tasks.max. Group hot tables alone; group cold tables together; reserve one connector per genuinely huge table.

Worked example — restarting a failed task without touching the connector

Detailed explanation. Tasks can transition to FAILED on transient errors (network blip, schema-history corruption, downstream throttle). The REST API lets you restart a single task without restarting the connector — which preserves the other tasks' progress and avoids a cluster-wide rebalance.

Question. A sink connector has four tasks; one (task 2) is FAILED with a "connection reset" error. Restart only that task without disturbing tasks 0, 1, and 3.

Input.

task state
0 RUNNING
1 RUNNING
2 FAILED
3 RUNNING

Code.

# 1) Inspect status to confirm task 2 is FAILED
curl http://connect:8083/connectors/s3-orders-sink/status

# 2) Restart only task 2 — other tasks continue uninterrupted
curl -X POST http://connect:8083/connectors/s3-orders-sink/tasks/2/restart

# 3) Re-check status; expect task 2 → RUNNING within a few seconds
curl http://connect:8083/connectors/s3-orders-sink/status

# 4) For a totally wedged connector, restart everything (including connector instance)
curl -X POST "http://connect:8083/connectors/s3-orders-sink/restart?includeTasks=true"
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. GET /status shows each task's current state plus the worker hosting it. The JSON includes the failure stack trace for any FAILED task — read it before restarting.
  2. POST /tasks/:id/restart is a targeted restart. The framework resets only the specified task on its current worker; tasks 0, 1, and 3 are not touched.
  3. The restarted task resumes from its last committed offset (sink) or from the source offset stored in connect-offsets (source). No data is replayed beyond the unflushed window.
  4. The ?includeTasks=true variant is the heavy hammer — restarts the connector instance plus every task. Use only after the cluster log confirms a connector-level corruption (rare).

Output.

Step Effect
1 — inspect confirms task 2 FAILED, stack trace logged
2 — restart task 2 task 2 → RUNNING within seconds
3 — verify all four tasks RUNNING
4 — full restart cluster-wide pause + resume of this connector only

Rule of thumb. Default to the single-task restart. The full connector restart is reserved for "every task is wedged and I have already read the cluster log." A tasks/:id/restart rarely costs more than a minute of throughput; a full connector restart can cost minutes plus the rebalance window.

Kafka interview question on Connect's runtime model

A senior interviewer might frame this as: "You're paged at 3 AM. A Connect worker died. Walk me through what happens to the running connectors, where the state lives, and how the cluster recovers — without losing data."

Solution Using the worker-task-connector-internal-topics model

1. The dead worker stops sending heartbeats to the group coordinator.
2. After session.timeout.ms (default 10s), the coordinator marks the
   worker dead and triggers a rebalance.
3. Connect 2.3+ uses *cooperative rebalance* — only the dead worker's
   tasks pause. Other tasks across the cluster keep running.
4. The coordinator reassigns the dead worker's tasks to surviving workers
   based on the latest config in `connect-configs`.
5. Each reassigned task resumes:
     • Source tasks: read their offset from `connect-offsets` topic
       and continue from there.
     • Sink tasks: rejoin the consumer group; the Kafka broker hands
       them their last-committed offset.
6. Status updates flow through `connect-statuses`; the REST API on any
   surviving worker shows the new task → worker assignment.
7. End user impact: a few seconds of paused processing on the dead
   worker's slice; zero data loss; no duplicates beyond the at-least-once
   re-read window.

The key invariant: every piece of durable state (config, source offsets,
status) lives in a Kafka topic — not on the worker's local disk. That is
why HA is automatic.
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Time Event Worker A Worker B Worker C
t=0 normal tasks 0, 1 tasks 2, 3 tasks 4, 5
t=10s A dies offline tasks 2, 3 tasks 4, 5
t=12s coordinator marks A dead offline tasks 2, 3 tasks 4, 5
t=13s rebalance starts offline pauses 2, 3 briefly pauses 4, 5 briefly
t=15s reassign offline tasks 1, 2, 3 tasks 0, 4, 5
t=16s tasks resume from connect-offsets offline reading from t=10s offset reading from t=10s offset
t=17s steady state offline 3 tasks 3 tasks

After the rebalance, the cluster has redistributed all six tasks across the two surviving workers. Zero data is lost; processing latency briefly spikes during the rebalance window then returns to normal.

Output:

Aspect Outcome
Data loss none
Duplicate window at-least-once re-read of the unflushed buffer
Time to recover seconds (cooperative rebalance)
Required operator action none (just bring up a replacement worker)

Why this works — concept by concept:

  • Internal topics as durable stateconnect-configs, connect-offsets, and connect-statuses survive every worker restart because they live in Kafka. The framework treats Kafka as the source of truth for cluster state.
  • Cooperative rebalance (Connect 2.3+) — only the impacted tasks pause; the rest of the cluster keeps processing. This is the upgrade from the stop-the-world protocol that dominated Connect 1.x.
  • Source offsets in connect-offsets — source connectors store their "last position" (e.g. binlog LSN) in this compacted topic, so any worker can resume reading from any source mid-stream.
  • Sink offsets in the consumer group — sink tasks are regular Kafka consumers; their offsets live in __consumer_offsets. Failover is the standard Kafka consumer-group recovery.
  • Cost — one rebalance window per worker failure, typically under 30 seconds. Operator cost: zero — just bring up a replacement worker. The framework absorbs the rest.

Streaming
Topic — streaming
Streaming cluster problems (Kafka)

Practice →


3. Source vs sink connectors — Debezium, JDBC, S3, Elasticsearch

kafka connect source reads from external systems and writes to Kafka; kafka connect sink reads from Kafka and writes to external systems — same framework, two contracts

The mental model in one line: a source connector implements SourceTask.poll() and returns a batch of SourceRecord objects that the framework writes to Kafka; a sink connector implements SinkTask.put(records) and consumes the records it receives — Kafka sits unchanged in the middle as the durable buffer. Once you can recite "source polls, sink puts," every connector category — JDBC, Debezium, S3, Elasticsearch, Snowflake — slots into one of the two contracts.

Visual diagram of Kafka Connect source vs sink lanes — left lane shows external sources (MySQL via Debezium CDC, Postgres via JDBC, files) flowing through source connectors into a central Kafka topic; right lane shows the same Kafka topic flowing through sink connectors out to S3 Parquet, JDBC Postgres warehouse, and Elasticsearch; on a light PipeCode card.

Source connector contract.

  • SourceConnector.taskConfigs(n) — given tasks.max = n, returns up to n task config maps. Each task gets a slice of the source (a table, a shard, a partition).
  • SourceTask.poll() — periodically called; returns a list of SourceRecord objects with (sourcePartition, sourceOffset, topic, key, value, schema). The framework writes them to Kafka.
  • SourceTask.commit() / commitRecord(record) — confirmation that a record has been successfully written to Kafka, so the source can advance its read pointer.
  • Offsets in connect-offsets — keyed by (connector_name, source_partition). Each task writes its own offsets; the framework reads them on startup.

Sink connector contract.

  • SinkConnector.taskConfigs(n) — same shape as source: returns up to n task config maps. Tasks subscribe to subsets of the topic partitions.
  • SinkTask.put(records) — called with a batch of records to write to the external system. May buffer, batch, retry, then write atomically.
  • SinkTask.flush(offsets) — called before offset commit; the task must ensure all buffered records are durably written.
  • Offsets via the consumer group — sink tasks are Kafka consumers; their offsets live in __consumer_offsets. Offset commit is gated on flush() succeeding.

The five source-connector modes (JDBC).

  • bulk — re-reads the whole table on every poll. Useful for small reference tables.
  • incrementing — emits rows where the named column has a strictly greater value than the last poll. Requires a monotonically increasing PK.
  • timestamp — emits rows where the named timestamp column is strictly greater than the last poll. Misses updates that don't change the timestamp.
  • timestamp+incrementing — both columns; handles updates and inserts. The recommended JDBC source mode.
  • query — emit results of a custom SELECT. Maximum flexibility, minimum framework support.

Debezium — log-based CDC in one paragraph.

Debezium connectors read the database transaction log (MySQL binlog, Postgres WAL, MongoDB oplog, SQL Server transaction log, Oracle redo log) and emit every insert, update, and delete as a structured event. Unlike JDBC source, Debezium captures every change including deletes, captures updates with both before and after images, and never falls behind because it consumes the same log the database uses for replication.

The four common sink connectors.

  • S3 sink — writes objects to S3 in a chosen format (Parquet, Avro, JSON) with time-based or field-based partitioning. The data-lake default.
  • JDBC sink — writes records to a relational table. Modes: insert (append-only) or upsert (insert-or-update by PK).
  • Elasticsearch sink — indexes documents into an ES index, using the record key as the document id by default. Supports near-real-time search over Kafka topics.
  • Snowflake / BigQuery sinks — load data into the warehouse via the vendor-native bulk API; Connect handles batching, transient errors, and schema mapping.

Common interview probes on source / sink.

  • "Why pick Debezium over JDBC source for CDC?" — Debezium captures deletes and gives correct ordering; JDBC source can only see what its query returns and misses deletes entirely.
  • "What is topic.prefix on a Debezium connector?" — the prefix applied to every emitted topic name (typically <server>.<schema>.<table>); choose something that survives table renames.
  • "How does the JDBC sink handle a primary-key collision?" — depends on insert.mode: insert raises a duplicate-key error; upsert translates to UPDATE.
  • "What is the unit of parallelism for a JDBC source?" — table. Each task owns one or more tables; a single table cannot be split.

Worked example — Debezium MySQL source emits before/after on UPDATE

Detailed explanation. Unlike a JDBC source that periodically polls and emits only the current row, Debezium reads the binlog and emits a structured event for every change with both the before and after images of the row. Downstream consumers can compute deltas, audit trails, or compensating writes from the same event.

Question. Given a MySQL orders table where row id=7 is updated from amount=100 to amount=120, show the Debezium event envelope. Identify the op, before, after, and source fields.

Input — before.

id customer_id amount status
7 42 100 placed

Input — after UPDATE orders SET amount = 120, status = 'paid' WHERE id = 7.

id customer_id amount status
7 42 120 paid

Code.

{
  "schema": { "...": "Avro schema, fetched by id from Schema Registry" },
  "payload": {
    "before": {
      "id": 7,
      "customer_id": 42,
      "amount": 100,
      "status": "placed"
    },
    "after": {
      "id": 7,
      "customer_id": 42,
      "amount": 120,
      "status": "paid"
    },
    "source": {
      "version": "2.5.0.Final",
      "connector": "mysql",
      "name": "shop",
      "ts_ms": 1717977600000,
      "snapshot": "false",
      "db": "shop",
      "table": "orders",
      "server_id": 1,
      "file": "binlog.000123",
      "pos": 4096,
      "row": 0
    },
    "op": "u",
    "ts_ms": 1717977600125
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The op field encodes the operation: c (create / insert), u (update), d (delete), r (read / snapshot). Downstream consumers branch on this single character.
  2. before is populated on u and d events; after is populated on c and u. Pure deletes (op = d) carry the deleted row in before and a NULL after.
  3. source.file and source.pos are the MySQL binlog coordinates; Debezium stores them in connect-offsets so a task restart resumes from the exact same byte position.
  4. The Avro-encoded envelope is referenced by a schema id (4-byte magic + id prefix) — the value bytes on the wire only contain the data, not the schema text.
  5. A downstream sink can either keep the full envelope (audit pipelines) or apply an SMT to flatten to just after (warehouse loaders).

Output.

Field Value Meaning
op u UPDATE
before.amount 100 pre-update value
after.amount 120 post-update value
source.file binlog.000123 binlog file
source.pos 4096 binlog offset
ts_ms 1717977600125 event time at Debezium

Rule of thumb. Always keep the Debezium envelope intact at the source-topic layer (audit + replay), and use an SMT or downstream consumer to flatten when writing into the warehouse. Discarding before at the source forfeits the ability to compute compensating writes later.

Worked example — JDBC source timestamp+incrementing for updates

Detailed explanation. Pure incrementing mode misses updates because the PK doesn't change. Pure timestamp mode misses inserts whose timestamp ties with the last poll. The timestamp+incrementing mode combines both columns and handles inserts, updates, and tie-breaking correctly.

Question. Configure the JDBC source on a Postgres customers table where updated_at reflects the last edit and id is the PK. Show why neither column alone is enough.

Input.

id name updated_at
1 Alice 2026-06-12 09:00:00
2 Bob 2026-06-12 09:00:00
3 Cara 2026-06-12 09:00:05

Code.

{
  "name": "pg-customers-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://pg:5432/shop",
    "table.whitelist": "customers",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "pg.",
    "poll.interval.ms": "3000",
    "tasks.max": "1"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The query the connector emits is SELECT * FROM customers WHERE updated_at > ? OR (updated_at = ? AND id > ?) ORDER BY updated_at, id — the tie-breaker on id ensures no row with the same timestamp is missed.
  2. The two saved values last_updated_at and last_id are persisted in connect-offsets. Restart resumes from the exact same position.
  3. Updates (which bump updated_at) re-emit the row to Kafka. Downstream consumers see the new state but cannot distinguish "INSERT" from "UPDATE" from JDBC alone — they only see the current row.
  4. Limitation: deletes are invisible. Soft deletes (is_deleted = true) can be captured because they bump updated_at; hard deletes vanish entirely.
  5. If the source needs delete capture or accurate UPDATE semantics, switch to Debezium.

Output.

Poll iteration Query parameters Rows emitted
1 (initial) t = epoch, id = 0 id=1, id=2, id=3
2 (no changes) t = 09:00:05, id = 3 (none)
3 (Alice edited) t = 09:00:05, id = 3 id=1 (now t=09:00:08)

Rule of thumb. Use timestamp+incrementing for "good enough" CDC on a clean source. Use Debezium when deletes matter, when ordering must match the transactional commit order, or when the source is too hot to poll at a useful frequency.

Worked example — JDBC sink upsert by primary key

Detailed explanation. The JDBC sink supports two write modes: insert (raw INSERT — duplicates raise PK errors) and upsert (INSERT ON CONFLICT UPDATE on Postgres; MERGE on others). Upsert mode is the default for CDC sinks that consume Debezium output, because every UPDATE in the source must be reflected as an UPDATE in the destination.

Question. Configure a JDBC sink that loads the cdc.shop.orders topic (Debezium-flattened, key = order_id) into a Postgres orders_mirror(id PK, customer_id, amount, status) table with upsert semantics.

Input — topic record (key=7).

field value
id 7
customer_id 42
amount 120
status paid

Code.

{
  "name": "pg-orders-mirror-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "cdc.shop.orders",
    "connection.url": "jdbc:postgresql://warehouse:5432/dwh",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "table.name.format": "orders_mirror",
    "tasks.max": "3",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. insert.mode = upsert translates to INSERT ... ON CONFLICT (id) DO UPDATE SET customer_id = EXCLUDED.customer_id, amount = EXCLUDED.amount, status = EXCLUDED.status.
  2. pk.mode = record_value + pk.fields = id tells the sink to take the id field from the record value (not from the Kafka key) as the upsert key. Use record_key if the Kafka key holds the PK.
  3. auto.create = true + auto.evolve = true lets the sink create the destination table on first record and add new columns when the schema evolves. Convenient for dev; usually disabled in regulated environments.
  4. tasks.max = 3 distributes the topic's partitions across three sink tasks. The Postgres write pool is shared; each task batches its writes.
  5. The sink commits Kafka offsets after the JDBC batch commits, so a worker crash mid-batch causes at-least-once redelivery — upsert mode makes that safe (re-applying the same UPDATE is idempotent on a row level).

Output.

Source event Destination SQL Effect
insert id=7 amount=100 INSERT ... id=7, amount=100 new row
update id=7 amount=120 INSERT ... ON CONFLICT DO UPDATE → amount=120 row updated
delete id=7 (sink does not emit DELETE by default — needs ExtractField + tombstone handling) row remains unless deletion is configured

Rule of thumb. For CDC mirrors, always pair Debezium source + JDBC sink in upsert mode + a tombstone-handling SMT (Debezium's ExtractNewRecordState with delete.handling.mode = none) so DELETEs flow as Kafka tombstones (null value) into the sink's delete path.

Kafka interview question on choosing source connectors

A senior interviewer often opens with: "Your team needs to replicate a busy MySQL orders table into a Snowflake warehouse with near-real-time freshness and full DELETE support. Walk me through which Connect source and sink you would pick and why."

Solution Using Debezium MySQL source + Snowflake sink (or S3 + Snowpipe)

[
  {
    "name": "mysql-orders-cdc",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.user": "debezium",
      "database.server.name": "shop",
      "database.include.list": "shop",
      "table.include.list": "shop.orders",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
      "schema.history.internal.kafka.topic": "schema-history.shop",
      "tasks.max": "1",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false"
    }
  },
  {
    "name": "snowflake-orders-sink",
    "config": {
      "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "topics": "shop.shop.orders",
      "snowflake.url.name": "abc.snowflakecomputing.com",
      "snowflake.database.name": "RAW",
      "snowflake.schema.name": "SHOP",
      "buffer.count.records": "10000",
      "buffer.flush.time": "60",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "tasks.max": "4"
    }
  }
]
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Stage Tool Why this tool
Capture Debezium MySQL Log-based CDC — catches every INSERT/UPDATE/DELETE in commit order, never misses changes between polls
Reshape ExtractNewRecordState SMT Flatten Debezium envelope to just after, keep tombstones for DELETE propagation
Buffer Kafka topic Durable, partitioned, replayable
Land Snowflake sink (Snowpipe Streaming) Vendor-native streaming ingest API — lower latency than file-based COPY, exactly-once into Snowflake
Schedule Connect framework No producer / consumer code; HA via distributed mode

After the choice, the team enables Schema Registry + Avro on both connectors so schema changes (adding an amount_currency column, for instance) propagate through the registry's BACKWARD compatibility check before reaching the sink.

Output:

Requirement Decision
Real-time freshness Debezium (binlog) + Snowpipe Streaming → seconds, not minutes
Full DELETE support Debezium emits op=d with tombstone; SMT keeps the tombstone for downstream
No custom code one source config + one sink config — both JSON
Schema evolution Avro + Schema Registry + BACKWARD compat
Operations Connect distributed mode handles HA, restart, scaling

Why this works — concept by concept:

  • Debezium for log-based CDC — reads the MySQL binlog directly so every commit is captured in order, including DELETEs. JDBC source cannot do this because it cannot see deleted rows.
  • ExtractNewRecordState SMT — Debezium's stock transformer for flattening the before/after envelope into "just the new row." Keeps the wire format simple for downstream sinks.
  • Snowpipe Streaming sink — vendor-native streaming API. Lower latency and exactly-once into Snowflake when configured with the right idempotent ingest mode.
  • Schema Registry + Avro — schema id on the wire; compatibility enforcement at registration time prevents producer changes from breaking the consumer.
  • Cost — two connector configs, one Connect cluster, one Schema Registry. The framework absorbs HA and offset management; engineering time goes to data modelling, not pipeline plumbing.

Streaming
Topic — etl
ETL pipeline problems (Kafka Connect)

Practice →


4. SMTs — Single Message Transforms

kafka connect smt is the in-flight reshape lane — per-record, no state, runs at the connector boundary; chain transforms with the transforms= config

The mental model in one line: a Single Message Transform is a Java class that takes one record and returns one record (or null), runs inside the worker between the connector and Kafka (for source) or between Kafka and the connector (for sink), and has zero access to multi-record state — anything stateful escapes to Kafka Streams or ksqlDB. Once you say "SMT = one record in, one record out, no state," the entire kafka connect smt interview surface collapses to "pick the right transform and chain them."

Visual diagram of a Kafka Connect SMT pipeline — a horizontal conveyor moving a record from left to right through four single-message transform stations (ExtractField, Cast, RegexRouter, MaskField); each station shows the input field shape changing into the output; a small chip describing where SMTs run (per record, no state, before serialization); on a light PipeCode card.

Where SMTs run.

  • Source connectors — after the source task produces a SourceRecord, before the framework writes it to Kafka. The transform sees the record exactly as the source emitted it.
  • Sink connectors — after the framework hands the consumed record to the sink task, before put(records) writes to the external system.
  • Per record, in process — every transform runs in the worker JVM, in the same thread that produces or consumes. Cost is microseconds per record per transform.
  • No multi-record state — SMTs see one record at a time. There is no buffer, no window, no join. For those, use Kafka Streams (DSL) or ksqlDB (SQL).

The high-frequency SMTs.

  • ExtractField — pull a nested field to the top level. Variants ExtractField$Key and ExtractField$Value. Useful for flattening Debezium envelopes.
  • Cast — change a field's type. Spec language: field1:int32, field2:string. Common fix for "the source has a string that should be a number" mismatch.
  • RegexRouter — rewrite the destination topic name with a regex + replacement. Useful for renaming mysql.shop.orders to just orders.
  • MaskField — replace a field's value with a constant (default empty string) or numeric 0. Stock PII redaction at the connector boundary.
  • ReplaceField — drop fields (exclude) or rename them (renames).
  • InsertField — add static / metadata fields (timestamp, connector name, source partition).
  • TimestampConverter — change a date/time field between epoch millis, ISO 8601, and SQL DATE / TIME / TIMESTAMP.
  • Filter / Predicate — keep or drop records based on a predicate (HasHeaderKey, RecordIsTombstone, TopicNameMatches). Avoid heavy filtering here — push it upstream where possible.

Chaining transforms.

  • The transforms property is a comma-separated list of aliases. Order matters — the chain runs left to right.
  • Each alias has its own config block: transforms.<alias>.type = ... plus transforms.<alias>.<field> = ....
  • A transforms.<alias>.predicate = ... makes the transform conditional. transforms.<alias>.negate = true inverts the predicate.

Common interview probes on SMTs.

  • "What is the cost of an SMT?" — microseconds per record per transform; cheap unless you put a regex in a hot path.
  • "Can an SMT join two records?" — no. SMTs are strictly stateless and per-record.
  • "Where would you put PII masking?" — at the source-connector boundary with MaskField. Centralises the redaction policy and ensures every downstream consumer sees the masked value.
  • "How do you route different rows to different topics?" — RegexRouter or a custom Transformation<R> that returns a record with a different topic().
  • "Can SMTs run on the sink side too?" — yes. Sink SMTs reshape records before the sink's put call.

Worked example — flatten Debezium envelope with ExtractNewRecordState

Detailed explanation. Debezium emits the full before/after/source/op/ts_ms envelope. Most warehouse loaders only want the after image (or before for deletes). Debezium ships a stock SMT — ExtractNewRecordState (formerly UnwrapFromEnvelope) — that flattens the envelope to just the row.

Question. Configure a Debezium MySQL connector that emits flattened records (just the row + a __op metadata field and a __source_ts_ms timestamp), with tombstones preserved for DELETE events.

Input (source envelope).

{
  "before": { "id": 7, "amount": 100 },
  "after":  { "id": 7, "amount": 120 },
  "source": { "ts_ms": 1717977600000, "table": "orders" },
  "op": "u",
  "ts_ms": 1717977600125
}
Enter fullscreen mode Exit fullscreen mode

Code.

{
  "name": "mysql-shop-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.server.name": "shop",
    "table.include.list": "shop.orders",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,source.ts_ms",
    "transforms.unwrap.add.headers": "source.table",
    "tasks.max": "1"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. ExtractNewRecordState is Debezium's transform — it extracts the after field for INSERT / UPDATE and the before field for DELETE.
  2. drop.tombstones = false keeps the Kafka tombstone (null value) for DELETE events. The JDBC sink can then translate the tombstone to a DELETE.
  3. delete.handling.mode = rewrite produces a non-null record with __deleted = true for delete events. Useful when the downstream loader doesn't honour tombstones.
  4. add.fields = op,source.ts_ms re-attaches Debezium metadata as __op and __source_ts_ms columns on the flattened record. The double-underscore prefix is the convention.
  5. add.headers = source.table writes the table name to the Kafka record header, so a downstream router can dispatch by table without parsing the value.

Output (Kafka record).

{
  "id": 7,
  "amount": 120,
  "__op": "u",
  "__source_ts_ms": 1717977600000
}
Enter fullscreen mode Exit fullscreen mode

Rule of thumb. Use ExtractNewRecordState on every Debezium source whose downstream is a warehouse loader. Keep the un-flattened topic also (set it as a second sink) for audit and replay — the cost of two topics is negligible compared to recovering lost change history.

Worked example — chain ExtractField, Cast, MaskField, RegexRouter

Detailed explanation. A realistic Connect pipeline chains four or five SMTs in a single config: one to flatten the envelope, one to coerce a numeric type, one to redact PII, one to rewrite the topic name. The transforms= property lists the aliases in execution order.

Question. Build the SMT chain for a source connector that flattens the envelope (unwrap), casts amount to int32 (cast_amount), masks email (mask_email), and renames topics mysql.shop.X to orders.X (route).

Input (after-unwrap record).

{
  "id": 7,
  "email": "a@x.com",
  "amount": "42"   // string from the binlog
}
Enter fullscreen mode Exit fullscreen mode

Code.

{
  "transforms": "unwrap,cast_amount,mask_email,route",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

  "transforms.cast_amount.type": "org.apache.kafka.connect.transforms.Cast$Value",
  "transforms.cast_amount.spec": "amount:int32",

  "transforms.mask_email.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask_email.fields": "email",
  "transforms.mask_email.replacement": "****",

  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "mysql\\.shop\\.(.+)",
  "transforms.route.replacement": "orders.$1"
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The framework processes the chain left to right. unwrap first: Debezium envelope → flat record.
  2. cast_amount reads the amount string and writes back an int32. The schema on the wire is updated so downstream consumers see the correct type.
  3. mask_email replaces the email field's value with "****". The field is still present (so downstream schemas don't break) but the data is redacted.
  4. route rewrites the destination topic name. A record whose source-derived topic was mysql.shop.orders is now published to orders.orders. (You'd usually configure the regex more carefully to avoid the duplicate.)
  5. Each transform sees the output of the previous one. If cast_amount failed (non-numeric value), the chain would either error the task or send the record to the dead-letter queue (depending on errors.tolerance).

Output (final record).

{
  "id": 7,
  "email": "****",
  "amount": 42
}
// topic: orders.orders (was: mysql.shop.orders)
Enter fullscreen mode Exit fullscreen mode

Rule of thumb. Keep SMT chains short (≤ 5 transforms) and well-named. The alias names appear in worker logs and metrics — descriptive aliases (mask_email, cast_amount) pay back the day a transform fails in production.

Worked example — predicate-gated SMT for conditional transforms

Detailed explanation. A predicate lets you run a transform only when a condition is true. Pair Filter$Value with RecordIsTombstone to drop tombstones on a topic where they would corrupt downstream loaders; pair MaskField with TopicNameMatches to mask PII only on tables that hold it.

Question. Apply MaskField to redact the ssn column only when the record's topic name matches users.* — leave records on other topics untouched.

Input.

topic id ssn
users.profiles 7 123-45-6789
events.clicks 999 (none — column does not exist)

Code.

{
  "transforms": "mask_ssn",
  "transforms.mask_ssn.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask_ssn.fields": "ssn",
  "transforms.mask_ssn.replacement": "[REDACTED]",
  "transforms.mask_ssn.predicate": "is_users_topic",

  "predicates": "is_users_topic",
  "predicates.is_users_topic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.is_users_topic.pattern": "users\\..*"
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The predicates top-level list declares predicate aliases — independent named filters.
  2. predicates.<alias>.type selects the predicate class (TopicNameMatches, RecordIsTombstone, HasHeaderKey).
  3. On any transform, transforms.<alias>.predicate = <predicate-alias> makes that transform run only when the predicate is true. Add transforms.<alias>.negate = true to invert.
  4. For our case, mask_ssn only runs when the record's topic name matches users\..*. Records on events.clicks flow through unchanged.
  5. Predicates do not drop records; they just gate the transform. To drop records, combine a predicate with the Filter$Value transform (which drops records when the predicate is true).

Output.

topic id ssn (after)
users.profiles 7 [REDACTED]
events.clicks 999 (unchanged — column missing or NULL, not redacted)

Rule of thumb. Use predicates for "apply transform X only when condition Y." When the goal is to drop records, pair the predicate with Filter$Value. Predicates also keep the SMT chain testable — you can reason about each branch independently.

Kafka interview question on the SMT contract

A senior interviewer often asks: "What can an SMT do, what can it not do, and what would you reach for instead in the second case?"

Solution Using the per-record, no-state, boundary-only framing

What an SMT CAN do (stock or custom):
  • Reshape a single record:
      – flatten (ExtractField, Debezium ExtractNewRecordState).
      – cast types (Cast).
      – rename / drop fields (ReplaceField).
      – mask values (MaskField).
      – insert metadata (InsertField).
  • Rewrite the destination topic name (RegexRouter, custom Transformation).
  • Filter records out (Filter + Predicate).

What an SMT CAN'T do:
  • Join two records — no multi-record state. Use Kafka Streams or
    ksqlDB.
  • Window or aggregate (sums, counts) — no state. Use Streams /
    Flink / ksqlDB.
  • Look up an external system per record cheaply — possible but
    expensive and synchronous; reach for an enrichment step in Streams
    instead.
  • Reorder records — runs strictly in arrival order.

When I need state, I escape upward:
  • Kafka Streams: code-defined DSL with state stores, joins, windows.
  • ksqlDB: SQL over the same engine — same capabilities, less code.
  • Flink: when the state or windowing needs go beyond Streams.

Boundary rule:
  • Source SMT: between source.poll() and the write to Kafka.
  • Sink SMT: between the consume from Kafka and sink.put().
  • Both run in the worker process — zero hop.
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Want Use
Flatten Debezium envelope SMT (ExtractNewRecordState)
Cast a string column to int SMT (Cast)
Mask an email SMT (MaskField)
Drop tombstones SMT (Filter + RecordIsTombstone)
Compute a 5-minute moving average Streams / ksqlDB / Flink
Join orders to customers (multi-record) Streams / ksqlDB
Look up city from zipcode (small ref) enrichment in Streams or ksqlDB JOIN
Per-record audit log sink SMT writes to a log topic via a custom Transformation

After the framing, the candidate slots every concrete ask into one of two boxes — "SMT can do it" or "I'd promote it to Streams / ksqlDB / Flink" — and answers in seconds rather than guessing.

Output:

Operation Where it runs
Single-record reshape SMT chain (transforms=)
Stateful or multi-record Kafka Streams / ksqlDB / Flink
Cross-topic join Streams (KStream-KTable) / ksqlDB JOIN / Flink
External lookup per record Streams or ksqlDB enrichment, not SMT

Why this works — concept by concept:

  • Per-record contract — SMTs receive one record and return one record (or null). Anything that needs more inputs needs a different engine.
  • No state — SMTs do not own a state store. The framework intentionally rejects designs that try to keep counters or windows inside a transform.
  • Boundary-only execution — runs in the worker JVM, same thread as the connector. Zero network hop, zero serialisation cost beyond the chain.
  • Order in transforms= — the chain executes left to right. Each transform sees the previous one's output. Misordering produces silent reshape bugs (e.g. masking after extracting from envelope means you never see the field to mask).
  • Cost — microseconds per record per transform. The SMT chain is the cheapest place to enforce typing, redaction, and routing. State and multi-record work belong upstream, in Streams or ksqlDB.

Streaming
Topic — streaming · python
Streaming with Python problems

Practice →


5. Schema Registry + idempotent and exactly-once delivery

schema registry decouples producers and consumers via versioned schemas; kafka connect idempotent plus transactional sinks promote at-least-once to exactly-once

The mental model in one line: Schema Registry is the central authority for the wire format — Avro, Protobuf, or JSON Schema — and enforces a compatibility ladder (BACKWARD / FORWARD / FULL / NONE) at registration time; Connect's default delivery is at-least-once, and you opt in to exactly-once by enabling the idempotent producer on source connectors and transactional commits on sinks that support them. Once you can recite "registry owns the schema, idempotent stops duplicates, transactional ties commits to writes," the whole kafka connect idempotent + Schema Registry interview surface resolves.

Visual diagram of Schema Registry plus idempotent / exactly-once delivery — left panel shows three serialization format columns (Avro, Protobuf, JSON Schema) flowing into a central Schema Registry box; a compatibility ladder showing BACKWARD / FORWARD / FULL / NONE; right panel shows producer idempotence (enable.idempotence=true, acks=all) and sink exactly-once (transactional consumer + commit); on a light PipeCode card.

Schema Registry — what it is.

  • A REST service that stores schemas keyed by (subject, version). Each schema also gets a globally unique id.
  • The wire format prefixes every value with a magic byte plus the 4-byte schema id. Consumers fetch the schema by id on first encounter and cache it.
  • Subject naming strategiesTopicNameStrategy (subject = <topic>-key / <topic>-value), RecordNameStrategy, TopicRecordNameStrategy. Default is TopicName.
  • Three serialisation formats supported — Avro (most common), Protobuf, JSON Schema.

Compatibility levels — the ladder.

Level Producer Consumer What you can change
BACKWARD (default) new schema old schema reads new data add optional fields with defaults; delete optional fields
FORWARD old schema new schema reads old data delete optional fields; add new ones with defaults
FULL both both only changes that satisfy BACKWARD ∧ FORWARD
NONE (none enforced) (none enforced) anything — use at your own risk

Schema evolution rules of thumb.

  • Always add fields with a default value. Otherwise BACKWARD breaks (an old consumer reading a new record sees a missing required field).
  • Never rename or retype a field. Retyping breaks both directions; renaming is treated as add + remove, which is two breaking changes.
  • Mark old fields as optional (Protobuf) / give a default (Avro) before removing them, and run a deprecation window where producers stop writing them.
  • Use FULL compatibility for shared schemas between teams that release independently. Use BACKWARD for "the consumer can lag the producer."

Connect's default delivery semantics.

  • Default = at-least-once. Source: offsets are committed after the records are durably in Kafka, so a worker crash before commit re-reads from the last committed offset → possible duplicates. Sink: similar — flush before offset commit.
  • At-most-once is not available out of the box; you would have to intentionally commit offsets before the write completes (rarely useful).
  • Exactly-once is opt-in and requires two pieces: the idempotent producer on the source side and transactional reads on the sink side. Both rely on Kafka's transaction protocol (introduced in 0.11).

The idempotent producer — what it actually does.

  • enable.idempotence = true — Kafka 3.0+ default. Assigns each producer a producer id; each (producer id, partition, sequence) triple is unique. Retries of the same record are deduped on the broker.
  • acks = all — every record waits for replication to all in-sync replicas before being acknowledged. Required for idempotence.
  • max.in.flight.requests.per.connection ≤ 5 — required to preserve ordering with retries enabled. Kafka 3.0+ default.
  • retries — set to high (MAX_INT) so transient failures retry indefinitely; idempotence makes that safe.

Exactly-once on the sink side.

  • Connect's exactly.once.support = required (worker-level, 3.3+) enables source-side exactly-once: source records are written transactionally, and the offset commit is part of the same transaction. A crash mid-batch rolls back both.
  • Sink connectors can achieve exactly-once into the destination only if the destination is idempotent (key-keyed upserts) or supports transactional writes (Snowflake's Snowpipe Streaming, JDBC with deduplication keys, S3 by using a record-key-based filename).
  • isolation.level = read_committed on the consumer side of the sink — ensures the sink only reads records that have been transactionally committed by the source, not records still in flight inside an open transaction.

Common interview probes.

  • "What does Schema Registry actually store?" — schemas keyed by (subject, version), with a globally unique id per schema; the wire format references the id, not the schema text.
  • "What is BACKWARD compatibility?" — the new schema can read data written by the old schema (or equivalently, an old consumer can still read new data). Default in Connect.
  • "How do you remove a field safely?" — for BACKWARD: the field must be optional (or have a default in the old schema) so the old consumer can read the new data without it.
  • "Does Kafka Connect support exactly-once?" — yes for source-side as of 3.3 with exactly.once.support = required; sink-side exactly-once depends on the destination's idempotence or transactional support.
  • "What's the cost of idempotent producer?" — small. Some sequence-tracking metadata per producer; acks = all may slightly raise latency.

Worked example — register an Avro schema with BACKWARD compatibility

Detailed explanation. The first time a producer publishes a record, the Avro converter checks Schema Registry: does the schema already exist for this subject? If yes, get the id and write the wire prefix. If no, register the schema (which triggers a compatibility check against the previous version).

Question. Register the v1 schema for the orders-value subject. Then attempt to register v2 (adds an optional currency field with default "USD") and v3 (renames amount to amt). Show which succeed and which fail under BACKWARD compatibility.

Input — v1.

{
  "type": "record",
  "name": "Order",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "amount", "type": "double" }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Input — v2 (adds optional currency).

{
  "type": "record",
  "name": "Order",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "amount", "type": "double" },
    { "name": "currency", "type": "string", "default": "USD" }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Input — v3 (renames amount → amt).

{
  "type": "record",
  "name": "Order",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "amt", "type": "double" },
    { "name": "currency", "type": "string", "default": "USD" }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Code.

# Set BACKWARD compatibility (default — but explicit is good)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{"compatibility":"BACKWARD"}' \
     http://schema-registry:8081/config/orders-value

# v1 — first registration succeeds
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{"schema":"<v1 schema JSON, escaped>"}' \
     http://schema-registry:8081/subjects/orders-value/versions

# v2 — succeeds (optional field with default is BACKWARD-compatible)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{"schema":"<v2 schema JSON, escaped>"}' \
     http://schema-registry:8081/subjects/orders-value/versions

# v3 — FAILS (rename = drop + add of a required field)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{"schema":"<v3 schema JSON, escaped>"}' \
     http://schema-registry:8081/subjects/orders-value/versions
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. v1 is the first version — no compatibility check, it is simply stored as version 1 with a new id.
  2. v2 adds an optional field with a default. BACKWARD says "the new schema can read old data" — when a new consumer reads an old record, it uses the default for the missing currency. Compatible. Registered as version 2.
  3. v3 renames amount to amt. Avro sees this as "v3 has a required amt field with no default; v2 has no amt at all." When a v3 consumer reads a v2 record, it cannot find amt and there is no default — read fails. BACKWARD breaks → registration rejected with HTTP 409.
  4. The fix for v3 is to keep amount and add amt as an alias, or to introduce amt with a default that mirrors amount and migrate downstream consumers before retiring amount.

Output.

Version BACKWARD check Result
v1 n/a (first) registered, id = 101
v2 (add optional with default) pass registered, id = 102
v3 (rename without alias) fail HTTP 409, registration rejected

Rule of thumb. Run the BACKWARD compatibility check in CI on every schema change before any producer is updated. The Schema Registry REST API has a POST /compatibility/subjects/<subject>/versions/latest endpoint that returns just "is this change compatible?" — perfect for a pre-commit hook.

Worked example — enable the idempotent producer on a source connector

Detailed explanation. Connect's source producer is a regular Kafka producer. The idempotent settings are the same as for any producer; you set them in the connector config under the producer.override.* prefix.

Question. Configure a source connector with the idempotent producer enabled and acks = all. Show the resulting producer behaviour after a worker network blip.

Input.

setting value
producer.override.enable.idempotence true
producer.override.acks all
producer.override.max.in.flight.requests.per.connection 5
producer.override.retries 2147483647

Code.

{
  "name": "mysql-shop-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.server.name": "shop",
    "tasks.max": "1",

    "producer.override.enable.idempotence": "true",
    "producer.override.acks": "all",
    "producer.override.max.in.flight.requests.per.connection": "5",
    "producer.override.retries": "2147483647",
    "producer.override.delivery.timeout.ms": "120000"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. enable.idempotence = true makes the producer assign a producer id (PID) on first send. Every record gets a (PID, partition, sequence) triple.
  2. The broker maintains the last sequence per (PID, partition). Duplicate sequences (from retries after a network blip) are silently discarded — the record is only appended once.
  3. acks = all ensures the broker waits for replication to every in-sync replica before acknowledging. Required for idempotence to give a meaningful "no duplicates" guarantee.
  4. max.in.flight.requests.per.connection = 5 plus idempotence keeps records ordered even with retries (the broker uses the sequence to detect out-of-order or duplicate records).
  5. After a transient broker reconnection, the source connector retries the in-flight batch. The broker dedupes by sequence. Net effect: no duplicates on the Kafka topic.

Output.

Event Without idempotence With idempotence
Record A sent, ack lost A reappears on retry → duplicate A discarded on retry, no duplicate
Records A, B, C reordered by retry possible impossible (sequence enforces order)
Connection lost mid-batch partial duplicates clean replay, no duplicates

Rule of thumb. Always enable idempotence on the source-side producer for any Connect source. As of Kafka 3.0, this is the default — but Connect connectors created before 3.0 inherit older defaults, so override explicitly in every connector config.

Worked example — sink-side exactly-once with read_committed

Detailed explanation. A transactional source writes records inside Kafka transactions. By default, consumers see in-flight transactional records (which may roll back). Setting isolation.level = read_committed makes the sink consumer skip uncommitted records, so the sink never writes records that the source later rolled back.

Question. Configure a JDBC sink so that it sees only committed transactional records from a transactional source, and so that the sink's UPSERTs are themselves idempotent.

Input.

setting value
consumer.override.isolation.level read_committed
insert.mode upsert
pk.mode record_key
max.retries 10
retry.backoff.ms 5000

Code.

{
  "name": "pg-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "shop.shop.orders",
    "connection.url": "jdbc:postgresql://warehouse:5432/dwh",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",

    "consumer.override.isolation.level": "read_committed",
    "consumer.override.enable.auto.commit": "false",

    "max.retries": "10",
    "retry.backoff.ms": "5000",
    "tasks.max": "3"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The sink subscribes to the source's topic. Without isolation.level = read_committed, the consumer would see records the source is still considering — if the source's transaction aborts, the sink would have already written them.
  2. With read_committed, the consumer only fetches records whose containing transaction has committed. Aborted transactions never reach the sink.
  3. The sink's insert.mode = upsert makes the destination idempotent at the row level: re-applying the same INSERT ... ON CONFLICT DO UPDATE is a no-op when the values match.
  4. Offset commit is gated on the JDBC batch commit (enable.auto.commit = false). A crash between the JDBC commit and the Kafka offset commit triggers at-most-once redelivery on restart — the upsert mode makes that safe.
  5. The combination — read_committed + idempotent destination — gives effective exactly-once into the warehouse, even though Connect's sink contract is technically at-least-once.

Output.

Scenario At-least-once sink (default) Exactly-once with read_committed + upsert
Source rolls back transaction T1 T1 records already in warehouse T1 records never reach warehouse
Sink crashes after JDBC commit, before Kafka commit re-applies T2 records → safe due to upsert re-applies T2 records → still safe
Net effect possible row-level duplicates every row at most once

Rule of thumb. For any sink consuming a transactional source, set isolation.level = read_committed and make the destination idempotent (upsert, MERGE, or vendor-native dedup). These two settings together close the door on aborted-transaction leakage and crash-mid-write duplicates.

Kafka interview question on schema evolution + exactly-once

A senior interviewer often combines: "You ship a Debezium → Kafka → Snowflake pipeline. A team wants to add an optional currency field to the orders schema. You also need exactly-once delivery so finance doesn't reconcile duplicate rows. Walk me through the design."

Solution Using BACKWARD-compatible schema + idempotent producer + Snowpipe Streaming sink

[
  {
    "name": "mysql-shop-cdc",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.server.name": "shop",
      "table.include.list": "shop.orders",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter.auto.register.schemas": "false",
      "value.converter.use.latest.version": "true",

      "producer.override.enable.idempotence": "true",
      "producer.override.acks": "all",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
  },
  {
    "name": "snowflake-orders-sink",
    "config": {
      "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "topics": "shop.shop.orders",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "consumer.override.isolation.level": "read_committed",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
  }
]
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Stage What runs Why
1. Schema change Register Avro v2 with optional currency, default "USD" BACKWARD-compatible — old consumers still read
2. Producer Debezium emits v2 records auto.register.schemas = false forces explicit registration via CI
3. Wire Idempotent producer with acks = all No duplicates on retry
4. Transaction Source writes within a Kafka transaction Aborted transactions are filtered downstream
5. Consumer Snowpipe Streaming sink with read_committed Skips uncommitted records
6. Destination Snowpipe Streaming with idempotent client Exactly-once into Snowflake

After the design, the team adds a CI hook that POSTs every Avro schema change to Schema Registry's compatibility endpoint and fails the PR if BACKWARD is broken. No producer can ship a schema that breaks the consumer.

Output:

Concern Answer
Schema change accepted yes — optional field with default is BACKWARD-safe
Old consumer reads new record yes — uses the default for currency
Producer retries duplicate records no — idempotent producer dedupes by sequence
Aborted transaction leaks to sink no — read_committed filters them
Crash mid-write to Snowflake safe — Snowpipe Streaming is idempotent

Why this works — concept by concept:

  • BACKWARD compatibility — the contract that new schemas can be read by old consumers. The Registry rejects breaking changes at registration time, so a producer can never ship a schema that breaks the downstream.
  • Optional field with default — the canonical "safe additive change." Old consumers see the default; new consumers see the actual value when present.
  • Idempotent producerenable.idempotence = true plus acks = all give "no duplicates on retry" with a producer-id + sequence triple. Kafka 3.0+ default.
  • Transactional source + read_committed sink — together they ensure the sink only writes records that the source actually committed. Aborted transactions disappear cleanly.
  • Snowpipe Streaming idempotent client — vendor-native API that promises exactly-once into Snowflake when used with the Connect sink's idempotent ingest mode.
  • Cost — idempotence is essentially free. Transactional source costs a small per-batch overhead. The Schema Registry adds one HTTP call per producer at startup (cached afterwards). Trade is overwhelmingly worth it for any pipeline that finance, billing, or compliance touches.

Streaming
Topic — streaming · medium
Streaming pipeline (medium difficulty)

Practice →


Cheat sheet — Kafka Connect recipes

  • Bootstrap a distributed worker. connect-distributed.sh config/connect-distributed.properties on each node with the same group.id. Internal topics (connect-configs, connect-offsets, connect-statuses) auto-create on first start.
  • Register a JDBC source connector. POST /connectors with connector.class = io.confluent.connect.jdbc.JdbcSourceConnector + mode = timestamp+incrementing + timestamp.column.name = updated_at + incrementing.column.name = id + topic.prefix = pg..
  • Register a Debezium MySQL source. connector.class = io.debezium.connector.mysql.MySqlConnector + database.server.name = shop + table.include.list = shop.orders + schema.history.internal.kafka.bootstrap.servers = kafka:9092. Single task per database.
  • Register an S3 sink. connector.class = io.confluent.connect.s3.S3SinkConnector + format.class = ParquetFormat + partitioner.class = TimeBasedPartitioner + flush.size = 1000 + rotate.interval.ms = 300000.
  • Register a JDBC sink (upsert). connector.class = io.confluent.connect.jdbc.JdbcSinkConnector + insert.mode = upsert + pk.mode = record_value + pk.fields = id + auto.create = false (in prod).
  • Chain SMTs. transforms = unwrap,cast,mask,route plus per-alias transforms.<alias>.type = ... and transforms.<alias>.<field> = .... Order is execution order.
  • Flatten a Debezium envelope. transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState + drop.tombstones = false + delete.handling.mode = rewrite + add.fields = op,source.ts_ms.
  • Configure Schema Registry compatibility. PUT /config/<subject> {"compatibility":"BACKWARD"}. Use BACKWARD for additive evolution, FULL for shared schemas between independent teams.
  • Enable idempotent producer on a source. producer.override.enable.idempotence = true + producer.override.acks = all + producer.override.max.in.flight.requests.per.connection = 5.
  • Enable read_committed on a sink. consumer.override.isolation.level = read_committed to skip aborted transactional records.
  • Monitor task status. GET /connectors/:name/status returns the connector and each task's state plus stack trace on failure. Wire into your alerting on state = FAILED.
  • Restart a single failed task. POST /connectors/:name/tasks/:id/restart. Far cheaper than restarting the whole connector.
  • Restart the connector + every task. POST /connectors/:name/restart?includeTasks=true. Use only for cluster-level corruption.
  • Dead-letter queue for bad records. errors.tolerance = all + errors.deadletterqueue.topic.name = dlq.<connector> + errors.deadletterqueue.topic.replication.factor = 3. Lets the connector skip records that fail SMTs or the converter.

Frequently asked questions

What is Kafka Connect and why use it over a custom consumer?

Kafka Connect is a distributed framework for moving data between Kafka and other systems via configuration. Instead of writing producer or consumer code, you submit a JSON connector config to a REST API, and the framework spawns parallel tasks that handle offset tracking, retries, schema management, and rebalance on failure. The win is twofold: you delete entire categories of bespoke ingestion code (every source / sink connector ships and is reused across the industry), and you get distributed-mode HA for free. Reach for a custom consumer only when no connector exists and the operational cost of writing one outweighs the cost of a 100-line custom Connector class. For "MySQL → S3," "Postgres → Snowflake," or "Kafka → Elasticsearch," Connect is the default answer.

What is the difference between source and sink connectors?

Source connectors read from an external system and write to Kafka; sink connectors read from Kafka and write to an external system. Both implement the same Connector interface and are deployed the same way, but they hit different parts of the framework: source tasks call poll() to emit SourceRecord batches that the worker writes to Kafka, while sink tasks receive SinkRecord batches via put(records) after the worker consumes them. Source offsets live in the connect-offsets topic; sink offsets live in the regular consumer-group offsets topic. Same framework, two contracts.

How does Debezium work and is it really log-based CDC?

Yes — Debezium reads the database's transaction log directly (MySQL binlog, Postgres WAL via logical replication slots, MongoDB oplog, SQL Server transaction log, Oracle redo log). Every committed change becomes a Kafka event with the before/after images plus metadata in a source envelope. Because it consumes the same log the database uses for replication, Debezium captures every change in commit order including DELETEs, never misses changes between polls, and survives connector restarts by resuming from the last persisted log position. JDBC source cannot do this — it can only see what its SELECT returns and is blind to deletes.

What are SMTs and when should I use them vs a stream processor?

SMTs (Single Message Transforms) are stateless per-record functions that run inside the Connect worker at the connector boundary — between poll() and the Kafka write (source) or between the consume and put(records) (sink). They reshape one record into one record: flatten a Debezium envelope (ExtractNewRecordState), cast a type (Cast), redact PII (MaskField), rename a topic (RegexRouter), drop a field (ReplaceField). Use them for record-local reshape. Reach for Kafka Streams, ksqlDB, or Flink the moment the transform needs multi-record state — joins, windows, aggregations, deduplication across keys. The Connect framework intentionally rejects designs that try to keep state inside an SMT.

Does Kafka Connect support exactly-once delivery?

Yes — source-side exactly-once is supported as of Kafka 3.3 by setting exactly.once.support = required at the worker level; the framework writes source records transactionally and includes the offset commit in the same transaction. Sink-side exactly-once depends on the destination: if the destination is idempotent (key-based upsert, JDBC MERGE, Snowflake Snowpipe Streaming) or supports transactional writes, you can pair consumer.override.isolation.level = read_committed with the idempotent producer on the source for effective end-to-end exactly-once. The default delivery semantic is at-least-once — practical for many pipelines, but always make it explicit which mode you ship.

What's the difference between standalone and distributed mode?

Standalone runs a single Connect process from a properties file — no REST API, no internal topics, no HA. Distributed runs a pool of worker JVMs that share the three internal topics (connect-configs, connect-offsets, connect-statuses) and form a cluster via a shared group.id. Distributed accepts config changes via REST, rebalances tasks across workers when one dies, and persists every piece of state in Kafka so workers can come and go without losing the pipeline. Standalone is for laptops, integration tests, and quick demos; distributed is the only mode you ever ship to production. Treat the two as different products.

Practice on PipeCode

Pipecode.ai is Leetcode for Data Engineering — every Kafka Connect recipe above ships with hands-on practice rooms where you write the Debezium config, the SMT chain, and the idempotent-sink JSON against real graded inputs. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to wonder whether your `kafka connect` design actually behaves the same on a single-worker laptop as it does on a multi-node production cluster.

Practice streaming pipelines now →
ETL pipeline drills →

Top comments (0)