kafka connect looks like "just another Kafka client" to a junior engineer — senior engineers know it is actually a full distributed framework that replaces an entire category of bespoke producer-and-consumer code with a few JSON configs. The result is the most under-appreciated lever in any modern ingestion stack: a runtime that turns "write a custom MySQL → S3 pipeline in 800 lines of Java" into "POST a 30-line connector config and watch four parallel tasks ship change data in seconds."
This guide is the deep dive you wanted the first time a kafka connect tutorial glossed over Debezium, sink semantics, or schema evolution and left you to fight production alone. It walks through the cluster anatomy (workers, tasks, connectors, REST API), the kafka connect source vs kafka connect sink contract, debezium log-based CDC, the kafka connect smt chain (ExtractField, Cast, RegexRouter, MaskField, ReplaceField), the schema registry compatibility ladder for Avro / Protobuf / JSON Schema, and the kafka connect idempotent producer plus exactly-once delivery semantics. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.
When you want hands-on reps immediately after reading, drill the streaming practice library →, rehearse on ETL pipeline problems →, and stack the streaming-in-Python muscles with streaming Python drills →.
On this page
- Why Kafka Connect exists — declarative ingestion
- Connect cluster anatomy — workers, tasks, REST API
- Source vs sink connectors — Debezium, JDBC, S3, Elasticsearch
- SMTs — Single Message Transforms
- Schema Registry + idempotent and exactly-once delivery
- Cheat sheet — Kafka Connect recipes
- Frequently asked questions
- Practice on PipeCode
1. Why Kafka Connect exists — declarative ingestion
kafka connect replaces the "two custom apps per pipeline" tax with a config-driven framework that handles parallelism, offsets, and restarts for you
The one-sentence invariant: Kafka Connect is a distributed framework for moving data between Kafka and other systems by configuration, not by code — every common source (databases, files, message queues) and sink (warehouses, blob stores, search indexes) has a reusable connector that you instantiate with JSON instead of writing a producer or consumer from scratch. Once you internalise that "Connect is what stands between every external system and Kafka," every line of bespoke ingestion code becomes a candidate for deletion.
The three places hand-rolled ingestion bites.
- Producers. A custom producer needs partition strategy, key serialisation, retry policy, batching, and back-pressure handling. Forget any one of them and you ship duplicates, hot partitions, or 4 AM pages.
- Consumers. A custom consumer needs offset management, rebalance handling, idempotent writes, schema parsing, and dead-letter queues. Skip any one and the next deploy drops or replays messages.
- Operations. Both apps need their own deployment, monitoring, scaling, and on-call rotation — for every pipeline.
Connect collapses all of that into a single runtime in one sentence.
A Kafka Connect cluster is a pool of JVM workers that run connectors; each connector is a JSON config that names a connector class (e.g. io.debezium.connector.mysql.MySqlConnector) plus its options (host, topic, credentials, transforms); the worker shards the connector into tasks and distributes them across the cluster — and you never write a producer or consumer line again.
What interviewers listen for.
- Do you say "Connect is declarative ingestion, not a Kafka client library" when asked what it is? — senior signal.
- Do you reach for a connector first when the question is "how do I move data from X to Kafka?" — required answer.
- Do you mention distributed mode plus the REST API as the production stance? — senior signal.
- Do you recognise that Connect is the same framework for both directions (source and sink) — required answer.
The 2026 reality.
- Debezium is still the default log-based CDC source for MySQL, Postgres, MongoDB, SQL Server, and Oracle.
- Confluent maintains a large catalogue of certified connectors (S3, JDBC, Snowflake, BigQuery, Elasticsearch); independent maintainers ship many more.
- Schema Registry is the de-facto wire-format hub for Avro / Protobuf / JSON Schema and is integrated end-to-end with Connect.
- Connect's idempotent producer has been the default since 3.0; transactional sinks now make exactly-once delivery practical for the first time.
Worked example — the JDBC source hello-world in one config
Detailed explanation. Before going deep on Debezium, the cheapest demonstration of Connect's "declarative ingestion" value proposition is a JDBC source pulling a Postgres orders table into Kafka with incrementing-PK polling. The whole pipeline is a single REST POST.
Question. Given a Postgres database with an orders(id BIGSERIAL, customer_id INT, amount NUMERIC, created_at TIMESTAMPTZ) table, write the JSON connector config that ships every new row to a Kafka topic named pg.orders with five parallel tasks. Show that no producer code is needed.
Input.
| connector config field | value |
|---|---|
| name | pg-orders-source |
| connector.class | io.confluent.connect.jdbc.JdbcSourceConnector |
| connection.url | jdbc:postgresql://pg:5432/shop |
| table.whitelist | orders |
| mode | incrementing |
| incrementing.column.name | id |
| topic.prefix | pg. |
| tasks.max | 5 |
Code.
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://pg:5432/shop",
"connection.user": "kafka_reader",
"connection.password": "${file:/secrets/pg.properties:pwd}",
"table.whitelist": "orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "pg.",
"tasks.max": "5",
"poll.interval.ms": "5000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
# Submit the connector via the Connect REST API
curl -X POST -H "Content-Type: application/json" \
--data @pg-orders-source.json \
http://connect:8083/connectors
Step-by-step explanation.
- The config names the connector (
pg-orders-source) and points the framework at theJdbcSourceConnectorclass shipped in the JDBC plugin. No custom Java is loaded. -
mode = incrementingplusincrementing.column.name = idtells the connector to remember the lastidit saw and only emit rows with a strictly greater id on the next poll. The "last id" is persisted in theconnect-offsetstopic. -
topic.prefix = pg.produces a topic namepg.ordersper source table. The connector creates the topic on first publish (ifauto.create.topics.enableis on) or you pre-create it with explicit partition count. -
tasks.max = 5is the requested parallelism. The framework asks the connector to split its work — JDBC source can split by table, so a single-table connector caps at one task. Multi-table connectors fan out up totasks.max. - Submitting the config to
POST /connectorsis the entire deployment. The Connect worker validates the config, persists it inconnect-configs, and assigns tasks to workers.
Output.
| Step | Producer code written | Consumer code written | Tasks running |
|---|---|---|---|
| 0 — submit config | 0 lines | 0 lines | 0 |
| 1 — connector starts | 0 lines | 0 lines | 1 (single table) |
| 2 — rows arrive | 0 lines | 0 lines | 1 |
3 — downstream consumer reads pg.orders
|
0 lines | usual consumer code | 1 |
Rule of thumb. Every time a teammate proposes "let's write a small producer to push X into Kafka," check whether a connector exists first. Connect's worst-case is a 100-line custom connector class; its best case is a 30-line JSON file that ships the same day.
Worked example — sink hello-world: S3 Parquet from a Kafka topic
Detailed explanation. The mirror image of the source hello-world is the sink: take a Kafka topic and ship every record into S3 as date-partitioned Parquet files. Like the source, the whole pipeline is one connector config — no consumer code.
Question. Given a Kafka topic pg.orders (Avro-encoded), write the S3 sink connector that lands records into s3://lake/raw/orders/yyyy=2026/mm=06/dd=12/... Parquet, flushed every 1,000 records or 5 minutes.
Input.
| sink config field | value |
|---|---|
| topics | pg.orders |
| s3.bucket.name | lake |
| s3.region | us-east-1 |
| format.class | io.confluent.connect.s3.format.parquet.ParquetFormat |
| partitioner.class | TimeBasedPartitioner |
| path.format | 'yyyy'=YYYY/'mm'=MM/'dd'=dd |
| flush.size | 1000 |
| rotate.interval.ms | 300000 |
Code.
{
"name": "s3-orders-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "pg.orders",
"s3.bucket.name": "lake",
"s3.region": "us-east-1",
"topics.dir": "raw",
"flush.size": "1000",
"rotate.interval.ms": "300000",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'yyyy'=YYYY/'mm'=MM/'dd'=dd",
"partition.duration.ms": "86400000",
"timestamp.extractor": "Record",
"tasks.max": "4",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Step-by-step explanation.
-
topics = pg.orderstells the sink connector which Kafka topic(s) to subscribe to. Multiple topics can be listed comma-separated. -
format.classselects the wire format on the S3 side — Parquet is the analytics-friendly default; Avro and JSON are also supported. -
partitioner.class = TimeBasedPartitioner+path.formatproduces a Hive-style date-partitioned key prefix. Athena, Spark, and Snowflake all auto-detect this layout for partition pruning. -
flush.size = 1000androtate.interval.ms = 300000define the dual flush triggers — whichever fires first closes the current file and uploads it to S3. -
tasks.max = 4distributes the four topic partitions across four workers. Each task owns a subset of partitions and writes its own files, so files are always partition-local — no cross-task contention.
Output.
| Trigger | Files produced | Average size | Partition path |
|---|---|---|---|
| flush.size hit at 09:12 | 1 | ~6 MB | yyyy=2026/mm=06/dd=12/ |
| rotate timer at 09:17 | 1 | ~1.4 MB | yyyy=2026/mm=06/dd=12/ |
| flush.size hit at 09:23 | 1 | ~6 MB | yyyy=2026/mm=06/dd=12/ |
| End of day | 96 files | ~5 MB avg | yyyy=2026/mm=06/dd=12/ |
Rule of thumb. Sink connectors are usually quieter than sources because they only need to fight the external system's write semantics. The S3 sink writes whole files; the JDBC sink batches inserts; the Elasticsearch sink upserts by document id. Pick the sink whose config maps cleanly to the destination's write contract.
Worked example — standalone mode vs distributed mode
Detailed explanation. Connect runs in two modes: standalone (one process, one config file, no cluster, no REST API) and distributed (a pool of workers sharing config in a Kafka internal topic, controlled via REST). Standalone is the laptop / one-machine integration-test mode; distributed is the only mode you ever run in production.
Question. Given a production CDC pipeline that needs to survive worker crashes, scale tasks horizontally, and accept config changes via API, which Connect mode is correct — standalone or distributed — and why?
Input.
| Requirement | Standalone | Distributed |
|---|---|---|
| Survives worker crash | no | yes (tasks rebalance) |
| Scale tasks across machines | no | yes |
| Config via REST API | no | yes |
| Config in a file | yes | no — in connect-configs topic |
| HA | no | yes |
Code.
# Standalone — single process, config from a file
connect-standalone.sh \
config/connect-standalone.properties \
config/jdbc-source.properties
# Distributed — N workers, same group.id, same internal topics
connect-distributed.sh config/connect-distributed.properties
# (run on each node; workers find each other via group.id)
Step-by-step explanation.
- Standalone reads connector configs from a properties file at startup. There is no API to add or update connectors at runtime — restart the process to change config.
- Distributed mode uses the
group.idto form a worker pool: every worker with the samegroup.idjoins the same Connect cluster and shares the three internal topics (connect-configs,connect-offsets,connect-statuses). - In distributed mode, the REST API on any worker accepts
POST /connectorsandPUT /connectors/:name/config— the change propagates throughconnect-configsso every worker sees it. - When a worker crashes in distributed mode, the cluster rebalances and surviving workers pick up the failed worker's tasks. Standalone has no failover — the pipeline stops.
- Tasks in standalone are bounded by one process; distributed scales tasks horizontally across machines until
tasks.maxis reached.
Output.
| Mode | When to use |
|---|---|
| Standalone | Dev laptop, integration tests, single-process demos |
| Distributed | Every production pipeline — always |
Rule of thumb. Treat standalone mode as "the CLI tool for learning Connect" — never run it in production. The day after you launch a standalone connector to production, a worker restart will silently lose your pipeline.
Kafka interview question on Connect's design intent
A senior interviewer often opens with: "Walk me through what Kafka Connect is, why it exists, and how it differs from writing a producer and consumer pair for the same job. When would you not use it?" It blends design-intent reasoning with a practical "when do you reach for Connect" answer.
Solution Using Connect's declarative + distributed framing
Kafka Connect is a distributed framework for moving data between Kafka and
other systems via configuration — JSON connector configs sent to a REST
API, not custom producer / consumer Java code.
It exists because every team that hand-rolled ingestion ended up rewriting
the same five concerns: partition strategy, offset management, retries,
schema handling, and operational scaling. Connect captures all five inside
the framework, leaves the "what to do per record" to a reusable Connector
class, and exposes the "how to instantiate" surface as JSON.
I use Connect when:
• The source or sink has an off-the-shelf connector (Debezium, JDBC, S3,
Elasticsearch, Snowflake, BigQuery, …).
• The pipeline is logically "table → topic" or "topic → store" without
multi-record state.
• Operations (HA, rebalance, restart, monitoring) matter as much as the
code that processes the records.
I do NOT use Connect when:
• The transform is stateful or multi-record — that's Kafka Streams or
ksqlDB territory, not SMT territory.
• The source emits a fundamentally non-Kafka-like stream (e.g. WebSockets
where each event needs custom auth per message).
• The team only has one tiny one-shot ingestion and wants to avoid running
a Connect cluster — a small custom producer might be lighter.
Step-by-step trace.
| Decision step | Standalone Java producer | Kafka Connect |
|---|---|---|
| 1. Wire up partition strategy | ~50 lines | 0 lines (config) |
| 2. Offset tracking | ~120 lines + DB | 0 lines (connect-offsets topic) |
| 3. Idempotent writes | ~80 lines | 0 lines (built-in producer) |
| 4. Schema serialisation | ~60 lines | 0 lines (Schema Registry converter) |
| 5. Operational HA | custom orchestration | distributed mode (built-in) |
| Total LOC | ~310+ ops glue | ~30 JSON + zero ops glue |
After the comparison, the architect chooses Connect for every "table → topic" or "topic → store" pipeline and keeps custom Java only for the two or three pipelines where the source / sink has no usable connector and the volume justifies the maintenance cost.
Output:
| Pipeline type | Recommended stack |
|---|---|
| MySQL CDC → Kafka → S3 Parquet | Connect (Debezium source + S3 sink) |
| Postgres bulk → Kafka | Connect (JDBC source, bulk mode) |
| Topic → Snowflake | Connect (Snowflake sink) |
| Stateful join across 3 topics | Kafka Streams / Flink, not Connect |
| One-shot CSV → Kafka, ad-hoc | small custom producer |
Why this works — concept by concept:
- Declarative ingestion — Connect captures the cross-cutting concerns (offsets, retries, parallelism, schema) inside the framework and asks the operator only "what to move and how to authenticate." Configuration replaces code.
-
Distributed workers — multiple JVMs in the same
group.idform a cluster. Failover is automatic, scaling is horizontal, and the REST API is uniform across all workers. -
Reusable connectors — every connector class implements the same
Connector/Taskinterface; once written, it serves every team forever. The ecosystem amortises engineering cost across the industry. - SMTs at the boundary — Single Message Transforms handle "reshape this record" without leaving Connect. Multi-record state escapes upward to Streams / Flink, by design.
- Cost — Connect adds one JVM cluster to your stack. The savings: dozens of hand-rolled ingestion apps that would otherwise need their own deployment, monitoring, and on-call rotation. The break-even is usually two non-trivial pipelines.
Streaming
Topic — streaming
Streaming pipeline problems (Kafka)
2. Connect cluster anatomy — workers, tasks, REST API
kafka connect is four layers — Worker (JVM), Connector (logical), Task (parallel), REST API (control) — and three internal topics keep state durable
The mental model in one line: a Connect cluster is a pool of Worker JVMs that share three Kafka internal topics; each Worker hosts Connector instances; each Connector spawns up to tasks.max Tasks that do the actual record-by-record work; the REST API is the only control plane. Once you can recite "Workers run, Connectors own, Tasks parallelise, REST controls," the entire operational surface of Connect resolves into the right mental boxes.
The four layers in one table.
| Layer | What it is | Lifetime | Lives where |
|---|---|---|---|
| Worker | A JVM process running the Connect runtime | machine lifetime | each node in distributed mode |
| Connector | A logical instance of a connector class | until deleted via REST | persisted in connect-configs
|
| Task | A parallel unit of work for a Connector | until Connector deleted or rebalanced | runs inside a Worker |
| REST API | Control plane (CRUD over connectors) | worker lifetime | port 8083 on every Worker |
Three internal topics — the durable state.
-
connect-configs— compacted topic that stores every connector config. Workers replay it on startup so they all agree on the cluster's connectors. -
connect-offsets— compacted topic that stores source-connector offsets (the "where did I leave off" pointer). Sink-connector offsets live in the regular consumer-group offsets topic. -
connect-statuses— compacted topic with the current state (RUNNING, FAILED, PAUSED) of every task. The REST API reads it onGET /status.
Tasks vs Workers — the parallelism story.
-
Connector specifies
tasks.max— the framework asks "how many tasks can split this work?" The connector answers with a list of partitions / shards. - The Worker pool decides assignment — tasks are distributed across workers via a rebalance protocol. Each worker runs a slice.
- Connect 2.3+ cooperative rebalance — only impacted tasks pause during a rebalance, rather than the old "stop the world" stop-the-cluster behaviour.
-
One task per partition (rule of thumb) — exceeding
partition_counton a single-topic sink wastes tasks; matchingpartition_countsaturates parallelism.
REST API — the only control plane you should script against.
-
POST /connectors— submit a new connector config. -
GET /connectors/:name/status— current state of the connector and each task. -
PUT /connectors/:name/config— update config (causes a restart). -
POST /connectors/:name/restart— restart a failed connector. -
POST /connectors/:name/tasks/:id/restart— restart a single failed task. -
DELETE /connectors/:name— delete a connector (tasks stop; offsets remain in their topic).
Common interview probes on the cluster model.
- "What happens when a worker dies?" — the cluster rebalances; surviving workers pick up the dead worker's tasks. The internal topics keep config and offsets durable.
- "Can two workers run the same task?" — no. Each task is owned by exactly one worker at a time. Rebalance moves ownership.
- "Where do source-connector offsets live?" — in the
connect-offsetstopic, keyed by(connector-name, source-partition). Sink offsets live in the standard consumer group offsets. - "What's the difference between
tasks.maxandpartitions?" —tasks.maxis the upper bound you give the framework;partitions(of the source / sink topic) is the natural bound. Effective tasks =min(tasks.max, splittable-units).
Worked example — submitting a connector via REST
Detailed explanation. Every production connector starts life as a JSON file submitted to the REST API. The framework validates it synchronously and returns the persisted config; failure modes (bad class name, missing required field) come back as 4xx with a JSON error body.
Question. Submit a Debezium MySQL connector named mysql-shop-cdc with tasks.max = 1 (Debezium is single-task per database) and verify it is RUNNING.
Input.
| field | value |
|---|---|
| name | mysql-shop-cdc |
| connector.class | io.debezium.connector.mysql.MySqlConnector |
| database.hostname | mysql |
| database.port | 3306 |
| database.user | debezium |
| database.server.name | shop |
| table.include.list | shop.orders, shop.line_items |
| tasks.max | 1 |
Code.
# 1) Submit the connector
curl -X POST -H "Content-Type: application/json" \
--data '{
"name": "mysql-shop-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/secrets/dbz.properties:pwd}",
"database.server.name": "shop",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.line_items",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-history.shop",
"tasks.max": "1"
}
}' \
http://connect:8083/connectors
# 2) Check status
curl http://connect:8083/connectors/mysql-shop-cdc/status
# 3) List all connectors in the cluster
curl http://connect:8083/connectors
Step-by-step explanation.
-
POST /connectorsvalidates the JSON against the connector class's config definition. Missing required fields (e.g. forgettingdatabase.hostname) return 400 with the offending field name. - The framework persists the config to
connect-configs. Every worker replays that topic on startup, so the connector survives the cluster being fully restarted. - The cluster assigns one task to one worker (Debezium is single-task per database). The worker starts the task, which opens a binlog reader and begins emitting events.
-
GET /connectors/:name/statusreturnsconnector: { state: RUNNING }plustasks: [ { id, state, worker_id } ]. If a task isFAILED, the JSON includes a stack trace. -
GET /connectorsreturns every connector in the cluster as a JSON array. Useful for inventory scripts and dashboards.
Output.
| Step | Response | Meaning |
|---|---|---|
| 1 — POST | 201 Created + config echo | persisted in connect-configs
|
| 2 — GET status | { connector: { state: RUNNING }, tasks: [{ id: 0, state: RUNNING }] } |
one task running on a worker |
| 3 — list | ["mysql-shop-cdc", "s3-orders-sink", "pg-orders-source"] |
three connectors in the cluster |
Rule of thumb. Store every connector config in version control as a .json file alongside the application code that depends on it. CI/CD then PUTs the latest config to the cluster on merge — the same pattern you would use for Kubernetes manifests.
Worked example — scaling tasks for a multi-table Debezium connector
Detailed explanation. Debezium is single-task per database — a constraint that surprises engineers used to scaling consumers by partition count. The way to parallelise CDC is not "increase tasks.max"; it is "split tables across multiple Debezium connectors" or use the signal-based parallelism helpers shipped in newer Debezium versions.
Question. A Postgres database has 12 large tables and CDC throughput is saturating a single Debezium task. Show how to fan out to four parallel connectors, one per table cluster, and how tasks.max and topic.prefix are configured.
Input.
| table group | tables |
|---|---|
| Group A | orders, order_items |
| Group B | customers, addresses |
| Group C | inventory, sku_prices |
| Group D | events_clickstream (high-volume) |
Code.
[
{
"name": "pg-cdc-group-a",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg",
"database.server.name": "shop_a",
"table.include.list": "public.orders,public.order_items",
"topic.prefix": "cdc.a",
"slot.name": "dbz_slot_a",
"publication.name": "dbz_pub_a",
"tasks.max": "1"
}
},
{
"name": "pg-cdc-group-b",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.server.name": "shop_b",
"table.include.list": "public.customers,public.addresses",
"topic.prefix": "cdc.b",
"slot.name": "dbz_slot_b",
"publication.name": "dbz_pub_b",
"tasks.max": "1"
}
}
]
Step-by-step explanation.
-
tasks.maxstays at1per connector — Debezium reads the WAL serially and cannot fan out a single replication slot across multiple tasks. - The fan-out happens at the connector layer: four separate Debezium connectors share the cluster but each owns a distinct table set.
- Each connector needs its own
slot.name(Postgres) ordatabase.server.name(MySQL); otherwise two connectors would compete for the same replication state and one would fail. - The
topic.prefixkeeps each group's topics distinct (cdc.a.public.orders,cdc.b.public.customers). Downstream sinks subscribe to the regex. - CPU and Postgres write-ahead-log slots are now the binding constraints. Each Debezium connector keeps one replication slot open and one consumer thread per task.
Output.
| Connector | Tables | Tasks | Replication slot |
|---|---|---|---|
| pg-cdc-group-a | orders, order_items | 1 | dbz_slot_a |
| pg-cdc-group-b | customers, addresses | 1 | dbz_slot_b |
| pg-cdc-group-c | inventory, sku_prices | 1 | dbz_slot_c |
| pg-cdc-group-d | events_clickstream | 1 | dbz_slot_d |
| Cluster total | 12 tables | 4 tasks | 4 slots |
Rule of thumb. When CDC throughput is the bottleneck, scale by partitioning tables across connectors — not by raising tasks.max. Group hot tables alone; group cold tables together; reserve one connector per genuinely huge table.
Worked example — restarting a failed task without touching the connector
Detailed explanation. Tasks can transition to FAILED on transient errors (network blip, schema-history corruption, downstream throttle). The REST API lets you restart a single task without restarting the connector — which preserves the other tasks' progress and avoids a cluster-wide rebalance.
Question. A sink connector has four tasks; one (task 2) is FAILED with a "connection reset" error. Restart only that task without disturbing tasks 0, 1, and 3.
Input.
| task | state |
|---|---|
| 0 | RUNNING |
| 1 | RUNNING |
| 2 | FAILED |
| 3 | RUNNING |
Code.
# 1) Inspect status to confirm task 2 is FAILED
curl http://connect:8083/connectors/s3-orders-sink/status
# 2) Restart only task 2 — other tasks continue uninterrupted
curl -X POST http://connect:8083/connectors/s3-orders-sink/tasks/2/restart
# 3) Re-check status; expect task 2 → RUNNING within a few seconds
curl http://connect:8083/connectors/s3-orders-sink/status
# 4) For a totally wedged connector, restart everything (including connector instance)
curl -X POST "http://connect:8083/connectors/s3-orders-sink/restart?includeTasks=true"
Step-by-step explanation.
-
GET /statusshows each task's current state plus the worker hosting it. The JSON includes the failure stack trace for any FAILED task — read it before restarting. -
POST /tasks/:id/restartis a targeted restart. The framework resets only the specified task on its current worker; tasks 0, 1, and 3 are not touched. - The restarted task resumes from its last committed offset (sink) or from the source offset stored in
connect-offsets(source). No data is replayed beyond the unflushed window. - The
?includeTasks=truevariant is the heavy hammer — restarts the connector instance plus every task. Use only after the cluster log confirms a connector-level corruption (rare).
Output.
| Step | Effect |
|---|---|
| 1 — inspect | confirms task 2 FAILED, stack trace logged |
| 2 — restart task 2 | task 2 → RUNNING within seconds |
| 3 — verify | all four tasks RUNNING |
| 4 — full restart | cluster-wide pause + resume of this connector only |
Rule of thumb. Default to the single-task restart. The full connector restart is reserved for "every task is wedged and I have already read the cluster log." A tasks/:id/restart rarely costs more than a minute of throughput; a full connector restart can cost minutes plus the rebalance window.
Kafka interview question on Connect's runtime model
A senior interviewer might frame this as: "You're paged at 3 AM. A Connect worker died. Walk me through what happens to the running connectors, where the state lives, and how the cluster recovers — without losing data."
Solution Using the worker-task-connector-internal-topics model
1. The dead worker stops sending heartbeats to the group coordinator.
2. After session.timeout.ms (default 10s), the coordinator marks the
worker dead and triggers a rebalance.
3. Connect 2.3+ uses *cooperative rebalance* — only the dead worker's
tasks pause. Other tasks across the cluster keep running.
4. The coordinator reassigns the dead worker's tasks to surviving workers
based on the latest config in `connect-configs`.
5. Each reassigned task resumes:
• Source tasks: read their offset from `connect-offsets` topic
and continue from there.
• Sink tasks: rejoin the consumer group; the Kafka broker hands
them their last-committed offset.
6. Status updates flow through `connect-statuses`; the REST API on any
surviving worker shows the new task → worker assignment.
7. End user impact: a few seconds of paused processing on the dead
worker's slice; zero data loss; no duplicates beyond the at-least-once
re-read window.
The key invariant: every piece of durable state (config, source offsets,
status) lives in a Kafka topic — not on the worker's local disk. That is
why HA is automatic.
Step-by-step trace.
| Time | Event | Worker A | Worker B | Worker C |
|---|---|---|---|---|
| t=0 | normal | tasks 0, 1 | tasks 2, 3 | tasks 4, 5 |
| t=10s | A dies | offline | tasks 2, 3 | tasks 4, 5 |
| t=12s | coordinator marks A dead | offline | tasks 2, 3 | tasks 4, 5 |
| t=13s | rebalance starts | offline | pauses 2, 3 briefly | pauses 4, 5 briefly |
| t=15s | reassign | offline | tasks 1, 2, 3 | tasks 0, 4, 5 |
| t=16s | tasks resume from connect-offsets
|
offline | reading from t=10s offset | reading from t=10s offset |
| t=17s | steady state | offline | 3 tasks | 3 tasks |
After the rebalance, the cluster has redistributed all six tasks across the two surviving workers. Zero data is lost; processing latency briefly spikes during the rebalance window then returns to normal.
Output:
| Aspect | Outcome |
|---|---|
| Data loss | none |
| Duplicate window | at-least-once re-read of the unflushed buffer |
| Time to recover | seconds (cooperative rebalance) |
| Required operator action | none (just bring up a replacement worker) |
Why this works — concept by concept:
-
Internal topics as durable state —
connect-configs,connect-offsets, andconnect-statusessurvive every worker restart because they live in Kafka. The framework treats Kafka as the source of truth for cluster state. - Cooperative rebalance (Connect 2.3+) — only the impacted tasks pause; the rest of the cluster keeps processing. This is the upgrade from the stop-the-world protocol that dominated Connect 1.x.
-
Source offsets in
connect-offsets— source connectors store their "last position" (e.g. binlog LSN) in this compacted topic, so any worker can resume reading from any source mid-stream. -
Sink offsets in the consumer group — sink tasks are regular Kafka consumers; their offsets live in
__consumer_offsets. Failover is the standard Kafka consumer-group recovery. - Cost — one rebalance window per worker failure, typically under 30 seconds. Operator cost: zero — just bring up a replacement worker. The framework absorbs the rest.
Streaming
Topic — streaming
Streaming cluster problems (Kafka)
3. Source vs sink connectors — Debezium, JDBC, S3, Elasticsearch
kafka connect source reads from external systems and writes to Kafka; kafka connect sink reads from Kafka and writes to external systems — same framework, two contracts
The mental model in one line: a source connector implements SourceTask.poll() and returns a batch of SourceRecord objects that the framework writes to Kafka; a sink connector implements SinkTask.put(records) and consumes the records it receives — Kafka sits unchanged in the middle as the durable buffer. Once you can recite "source polls, sink puts," every connector category — JDBC, Debezium, S3, Elasticsearch, Snowflake — slots into one of the two contracts.
Source connector contract.
-
SourceConnector.taskConfigs(n)— giventasks.max = n, returns up tontask config maps. Each task gets a slice of the source (a table, a shard, a partition). -
SourceTask.poll()— periodically called; returns a list ofSourceRecordobjects with(sourcePartition, sourceOffset, topic, key, value, schema). The framework writes them to Kafka. -
SourceTask.commit()/commitRecord(record)— confirmation that a record has been successfully written to Kafka, so the source can advance its read pointer. -
Offsets in
connect-offsets— keyed by(connector_name, source_partition). Each task writes its own offsets; the framework reads them on startup.
Sink connector contract.
-
SinkConnector.taskConfigs(n)— same shape as source: returns up tontask config maps. Tasks subscribe to subsets of the topic partitions. -
SinkTask.put(records)— called with a batch of records to write to the external system. May buffer, batch, retry, then write atomically. -
SinkTask.flush(offsets)— called before offset commit; the task must ensure all buffered records are durably written. -
Offsets via the consumer group — sink tasks are Kafka consumers; their offsets live in
__consumer_offsets. Offset commit is gated onflush()succeeding.
The five source-connector modes (JDBC).
-
bulk— re-reads the whole table on every poll. Useful for small reference tables. -
incrementing— emits rows where the named column has a strictly greater value than the last poll. Requires a monotonically increasing PK. -
timestamp— emits rows where the named timestamp column is strictly greater than the last poll. Misses updates that don't change the timestamp. -
timestamp+incrementing— both columns; handles updates and inserts. The recommended JDBC source mode. -
query— emit results of a custom SELECT. Maximum flexibility, minimum framework support.
Debezium — log-based CDC in one paragraph.
Debezium connectors read the database transaction log (MySQL binlog, Postgres WAL, MongoDB oplog, SQL Server transaction log, Oracle redo log) and emit every insert, update, and delete as a structured event. Unlike JDBC source, Debezium captures every change including deletes, captures updates with both before and after images, and never falls behind because it consumes the same log the database uses for replication.
The four common sink connectors.
- S3 sink — writes objects to S3 in a chosen format (Parquet, Avro, JSON) with time-based or field-based partitioning. The data-lake default.
-
JDBC sink — writes records to a relational table. Modes:
insert(append-only) orupsert(insert-or-update by PK). - Elasticsearch sink — indexes documents into an ES index, using the record key as the document id by default. Supports near-real-time search over Kafka topics.
- Snowflake / BigQuery sinks — load data into the warehouse via the vendor-native bulk API; Connect handles batching, transient errors, and schema mapping.
Common interview probes on source / sink.
- "Why pick Debezium over JDBC source for CDC?" — Debezium captures deletes and gives correct ordering; JDBC source can only see what its query returns and misses deletes entirely.
- "What is
topic.prefixon a Debezium connector?" — the prefix applied to every emitted topic name (typically<server>.<schema>.<table>); choose something that survives table renames. - "How does the JDBC sink handle a primary-key collision?" — depends on
insert.mode:insertraises a duplicate-key error;upserttranslates to UPDATE. - "What is the unit of parallelism for a JDBC source?" — table. Each task owns one or more tables; a single table cannot be split.
Worked example — Debezium MySQL source emits before/after on UPDATE
Detailed explanation. Unlike a JDBC source that periodically polls and emits only the current row, Debezium reads the binlog and emits a structured event for every change with both the before and after images of the row. Downstream consumers can compute deltas, audit trails, or compensating writes from the same event.
Question. Given a MySQL orders table where row id=7 is updated from amount=100 to amount=120, show the Debezium event envelope. Identify the op, before, after, and source fields.
Input — before.
| id | customer_id | amount | status |
|---|---|---|---|
| 7 | 42 | 100 | placed |
Input — after UPDATE orders SET amount = 120, status = 'paid' WHERE id = 7.
| id | customer_id | amount | status |
|---|---|---|---|
| 7 | 42 | 120 | paid |
Code.
{
"schema": { "...": "Avro schema, fetched by id from Schema Registry" },
"payload": {
"before": {
"id": 7,
"customer_id": 42,
"amount": 100,
"status": "placed"
},
"after": {
"id": 7,
"customer_id": 42,
"amount": 120,
"status": "paid"
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "shop",
"ts_ms": 1717977600000,
"snapshot": "false",
"db": "shop",
"table": "orders",
"server_id": 1,
"file": "binlog.000123",
"pos": 4096,
"row": 0
},
"op": "u",
"ts_ms": 1717977600125
}
}
Step-by-step explanation.
- The
opfield encodes the operation:c(create / insert),u(update),d(delete),r(read / snapshot). Downstream consumers branch on this single character. -
beforeis populated onuanddevents;afteris populated oncandu. Pure deletes (op = d) carry the deleted row inbeforeand a NULLafter. -
source.fileandsource.posare the MySQL binlog coordinates; Debezium stores them inconnect-offsetsso a task restart resumes from the exact same byte position. - The Avro-encoded envelope is referenced by a schema id (4-byte magic + id prefix) — the value bytes on the wire only contain the data, not the schema text.
- A downstream sink can either keep the full envelope (audit pipelines) or apply an SMT to flatten to just
after(warehouse loaders).
Output.
| Field | Value | Meaning |
|---|---|---|
| op | u | UPDATE |
| before.amount | 100 | pre-update value |
| after.amount | 120 | post-update value |
| source.file | binlog.000123 | binlog file |
| source.pos | 4096 | binlog offset |
| ts_ms | 1717977600125 | event time at Debezium |
Rule of thumb. Always keep the Debezium envelope intact at the source-topic layer (audit + replay), and use an SMT or downstream consumer to flatten when writing into the warehouse. Discarding before at the source forfeits the ability to compute compensating writes later.
Worked example — JDBC source timestamp+incrementing for updates
Detailed explanation. Pure incrementing mode misses updates because the PK doesn't change. Pure timestamp mode misses inserts whose timestamp ties with the last poll. The timestamp+incrementing mode combines both columns and handles inserts, updates, and tie-breaking correctly.
Question. Configure the JDBC source on a Postgres customers table where updated_at reflects the last edit and id is the PK. Show why neither column alone is enough.
Input.
| id | name | updated_at |
|---|---|---|
| 1 | Alice | 2026-06-12 09:00:00 |
| 2 | Bob | 2026-06-12 09:00:00 |
| 3 | Cara | 2026-06-12 09:00:05 |
Code.
{
"name": "pg-customers-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://pg:5432/shop",
"table.whitelist": "customers",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "pg.",
"poll.interval.ms": "3000",
"tasks.max": "1"
}
}
Step-by-step explanation.
- The query the connector emits is
SELECT * FROM customers WHERE updated_at > ? OR (updated_at = ? AND id > ?) ORDER BY updated_at, id— the tie-breaker onidensures no row with the same timestamp is missed. - The two saved values
last_updated_atandlast_idare persisted inconnect-offsets. Restart resumes from the exact same position. - Updates (which bump
updated_at) re-emit the row to Kafka. Downstream consumers see the new state but cannot distinguish "INSERT" from "UPDATE" from JDBC alone — they only see the current row. -
Limitation: deletes are invisible. Soft deletes (
is_deleted = true) can be captured because they bumpupdated_at; hard deletes vanish entirely. - If the source needs delete capture or accurate UPDATE semantics, switch to Debezium.
Output.
| Poll iteration | Query parameters | Rows emitted |
|---|---|---|
| 1 (initial) | t = epoch, id = 0 | id=1, id=2, id=3 |
| 2 (no changes) | t = 09:00:05, id = 3 | (none) |
| 3 (Alice edited) | t = 09:00:05, id = 3 | id=1 (now t=09:00:08) |
Rule of thumb. Use timestamp+incrementing for "good enough" CDC on a clean source. Use Debezium when deletes matter, when ordering must match the transactional commit order, or when the source is too hot to poll at a useful frequency.
Worked example — JDBC sink upsert by primary key
Detailed explanation. The JDBC sink supports two write modes: insert (raw INSERT — duplicates raise PK errors) and upsert (INSERT ON CONFLICT UPDATE on Postgres; MERGE on others). Upsert mode is the default for CDC sinks that consume Debezium output, because every UPDATE in the source must be reflected as an UPDATE in the destination.
Question. Configure a JDBC sink that loads the cdc.shop.orders topic (Debezium-flattened, key = order_id) into a Postgres orders_mirror(id PK, customer_id, amount, status) table with upsert semantics.
Input — topic record (key=7).
| field | value |
|---|---|
| id | 7 |
| customer_id | 42 |
| amount | 120 |
| status | paid |
Code.
{
"name": "pg-orders-mirror-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "cdc.shop.orders",
"connection.url": "jdbc:postgresql://warehouse:5432/dwh",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id",
"table.name.format": "orders_mirror",
"tasks.max": "3",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Step-by-step explanation.
-
insert.mode = upserttranslates toINSERT ... ON CONFLICT (id) DO UPDATE SET customer_id = EXCLUDED.customer_id, amount = EXCLUDED.amount, status = EXCLUDED.status. -
pk.mode = record_value+pk.fields = idtells the sink to take theidfield from the record value (not from the Kafka key) as the upsert key. Userecord_keyif the Kafka key holds the PK. -
auto.create = true+auto.evolve = truelets the sink create the destination table on first record and add new columns when the schema evolves. Convenient for dev; usually disabled in regulated environments. -
tasks.max = 3distributes the topic's partitions across three sink tasks. The Postgres write pool is shared; each task batches its writes. - The sink commits Kafka offsets after the JDBC batch commits, so a worker crash mid-batch causes at-least-once redelivery — upsert mode makes that safe (re-applying the same UPDATE is idempotent on a row level).
Output.
| Source event | Destination SQL | Effect |
|---|---|---|
| insert id=7 amount=100 | INSERT ... id=7, amount=100 | new row |
| update id=7 amount=120 | INSERT ... ON CONFLICT DO UPDATE → amount=120 | row updated |
| delete id=7 | (sink does not emit DELETE by default — needs ExtractField + tombstone handling) | row remains unless deletion is configured |
Rule of thumb. For CDC mirrors, always pair Debezium source + JDBC sink in upsert mode + a tombstone-handling SMT (Debezium's ExtractNewRecordState with delete.handling.mode = none) so DELETEs flow as Kafka tombstones (null value) into the sink's delete path.
Kafka interview question on choosing source connectors
A senior interviewer often opens with: "Your team needs to replicate a busy MySQL orders table into a Snowflake warehouse with near-real-time freshness and full DELETE support. Walk me through which Connect source and sink you would pick and why."
Solution Using Debezium MySQL source + Snowflake sink (or S3 + Snowpipe)
[
{
"name": "mysql-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.user": "debezium",
"database.server.name": "shop",
"database.include.list": "shop",
"table.include.list": "shop.orders",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-history.shop",
"tasks.max": "1",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
},
{
"name": "snowflake-orders-sink",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "shop.shop.orders",
"snowflake.url.name": "abc.snowflakecomputing.com",
"snowflake.database.name": "RAW",
"snowflake.schema.name": "SHOP",
"buffer.count.records": "10000",
"buffer.flush.time": "60",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"tasks.max": "4"
}
}
]
Step-by-step trace.
| Stage | Tool | Why this tool |
|---|---|---|
| Capture | Debezium MySQL | Log-based CDC — catches every INSERT/UPDATE/DELETE in commit order, never misses changes between polls |
| Reshape | ExtractNewRecordState SMT | Flatten Debezium envelope to just after, keep tombstones for DELETE propagation |
| Buffer | Kafka topic | Durable, partitioned, replayable |
| Land | Snowflake sink (Snowpipe Streaming) | Vendor-native streaming ingest API — lower latency than file-based COPY, exactly-once into Snowflake |
| Schedule | Connect framework | No producer / consumer code; HA via distributed mode |
After the choice, the team enables Schema Registry + Avro on both connectors so schema changes (adding an amount_currency column, for instance) propagate through the registry's BACKWARD compatibility check before reaching the sink.
Output:
| Requirement | Decision |
|---|---|
| Real-time freshness | Debezium (binlog) + Snowpipe Streaming → seconds, not minutes |
| Full DELETE support | Debezium emits op=d with tombstone; SMT keeps the tombstone for downstream |
| No custom code | one source config + one sink config — both JSON |
| Schema evolution | Avro + Schema Registry + BACKWARD compat |
| Operations | Connect distributed mode handles HA, restart, scaling |
Why this works — concept by concept:
- Debezium for log-based CDC — reads the MySQL binlog directly so every commit is captured in order, including DELETEs. JDBC source cannot do this because it cannot see deleted rows.
-
ExtractNewRecordState SMT — Debezium's stock transformer for flattening the
before/afterenvelope into "just the new row." Keeps the wire format simple for downstream sinks. - Snowpipe Streaming sink — vendor-native streaming API. Lower latency and exactly-once into Snowflake when configured with the right idempotent ingest mode.
- Schema Registry + Avro — schema id on the wire; compatibility enforcement at registration time prevents producer changes from breaking the consumer.
- Cost — two connector configs, one Connect cluster, one Schema Registry. The framework absorbs HA and offset management; engineering time goes to data modelling, not pipeline plumbing.
Streaming
Topic — etl
ETL pipeline problems (Kafka Connect)
4. SMTs — Single Message Transforms
kafka connect smt is the in-flight reshape lane — per-record, no state, runs at the connector boundary; chain transforms with the transforms= config
The mental model in one line: a Single Message Transform is a Java class that takes one record and returns one record (or null), runs inside the worker between the connector and Kafka (for source) or between Kafka and the connector (for sink), and has zero access to multi-record state — anything stateful escapes to Kafka Streams or ksqlDB. Once you say "SMT = one record in, one record out, no state," the entire kafka connect smt interview surface collapses to "pick the right transform and chain them."
Where SMTs run.
-
Source connectors — after the source task produces a
SourceRecord, before the framework writes it to Kafka. The transform sees the record exactly as the source emitted it. -
Sink connectors — after the framework hands the consumed record to the sink task, before
put(records)writes to the external system. - Per record, in process — every transform runs in the worker JVM, in the same thread that produces or consumes. Cost is microseconds per record per transform.
- No multi-record state — SMTs see one record at a time. There is no buffer, no window, no join. For those, use Kafka Streams (DSL) or ksqlDB (SQL).
The high-frequency SMTs.
-
ExtractField — pull a nested field to the top level. Variants
ExtractField$KeyandExtractField$Value. Useful for flattening Debezium envelopes. -
Cast — change a field's type. Spec language:
field1:int32, field2:string. Common fix for "the source has a string that should be a number" mismatch. -
RegexRouter — rewrite the destination topic name with a regex + replacement. Useful for renaming
mysql.shop.ordersto justorders. - MaskField — replace a field's value with a constant (default empty string) or numeric 0. Stock PII redaction at the connector boundary.
-
ReplaceField — drop fields (
exclude) or rename them (renames). - InsertField — add static / metadata fields (timestamp, connector name, source partition).
-
TimestampConverter — change a date/time field between epoch millis, ISO 8601, and SQL
DATE/TIME/TIMESTAMP. -
Filter / Predicate — keep or drop records based on a predicate (
HasHeaderKey,RecordIsTombstone,TopicNameMatches). Avoid heavy filtering here — push it upstream where possible.
Chaining transforms.
- The
transformsproperty is a comma-separated list of aliases. Order matters — the chain runs left to right. - Each alias has its own config block:
transforms.<alias>.type = ...plustransforms.<alias>.<field> = .... - A
transforms.<alias>.predicate = ...makes the transform conditional.transforms.<alias>.negate = trueinverts the predicate.
Common interview probes on SMTs.
- "What is the cost of an SMT?" — microseconds per record per transform; cheap unless you put a regex in a hot path.
- "Can an SMT join two records?" — no. SMTs are strictly stateless and per-record.
- "Where would you put PII masking?" — at the source-connector boundary with
MaskField. Centralises the redaction policy and ensures every downstream consumer sees the masked value. - "How do you route different rows to different topics?" —
RegexRouteror a customTransformation<R>that returns a record with a differenttopic(). - "Can SMTs run on the sink side too?" — yes. Sink SMTs reshape records before the sink's
putcall.
Worked example — flatten Debezium envelope with ExtractNewRecordState
Detailed explanation. Debezium emits the full before/after/source/op/ts_ms envelope. Most warehouse loaders only want the after image (or before for deletes). Debezium ships a stock SMT — ExtractNewRecordState (formerly UnwrapFromEnvelope) — that flattens the envelope to just the row.
Question. Configure a Debezium MySQL connector that emits flattened records (just the row + a __op metadata field and a __source_ts_ms timestamp), with tombstones preserved for DELETE events.
Input (source envelope).
{
"before": { "id": 7, "amount": 100 },
"after": { "id": 7, "amount": 120 },
"source": { "ts_ms": 1717977600000, "table": "orders" },
"op": "u",
"ts_ms": 1717977600125
}
Code.
{
"name": "mysql-shop-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.server.name": "shop",
"table.include.list": "shop.orders",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.unwrap.add.headers": "source.table",
"tasks.max": "1"
}
}
Step-by-step explanation.
-
ExtractNewRecordStateis Debezium's transform — it extracts theafterfield for INSERT / UPDATE and thebeforefield for DELETE. -
drop.tombstones = falsekeeps the Kafka tombstone (null value) for DELETE events. The JDBC sink can then translate the tombstone to a DELETE. -
delete.handling.mode = rewriteproduces a non-null record with__deleted = truefor delete events. Useful when the downstream loader doesn't honour tombstones. -
add.fields = op,source.ts_msre-attaches Debezium metadata as__opand__source_ts_mscolumns on the flattened record. The double-underscore prefix is the convention. -
add.headers = source.tablewrites the table name to the Kafka record header, so a downstream router can dispatch by table without parsing the value.
Output (Kafka record).
{
"id": 7,
"amount": 120,
"__op": "u",
"__source_ts_ms": 1717977600000
}
Rule of thumb. Use ExtractNewRecordState on every Debezium source whose downstream is a warehouse loader. Keep the un-flattened topic also (set it as a second sink) for audit and replay — the cost of two topics is negligible compared to recovering lost change history.
Worked example — chain ExtractField, Cast, MaskField, RegexRouter
Detailed explanation. A realistic Connect pipeline chains four or five SMTs in a single config: one to flatten the envelope, one to coerce a numeric type, one to redact PII, one to rewrite the topic name. The transforms= property lists the aliases in execution order.
Question. Build the SMT chain for a source connector that flattens the envelope (unwrap), casts amount to int32 (cast_amount), masks email (mask_email), and renames topics mysql.shop.X to orders.X (route).
Input (after-unwrap record).
{
"id": 7,
"email": "a@x.com",
"amount": "42" // string from the binlog
}
Code.
{
"transforms": "unwrap,cast_amount,mask_email,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.cast_amount.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast_amount.spec": "amount:int32",
"transforms.mask_email.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask_email.fields": "email",
"transforms.mask_email.replacement": "****",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "mysql\\.shop\\.(.+)",
"transforms.route.replacement": "orders.$1"
}
Step-by-step explanation.
- The framework processes the chain left to right.
unwrapfirst: Debezium envelope → flat record. -
cast_amountreads theamountstring and writes back an int32. The schema on the wire is updated so downstream consumers see the correct type. -
mask_emailreplaces theemailfield's value with"****". The field is still present (so downstream schemas don't break) but the data is redacted. -
routerewrites the destination topic name. A record whose source-derived topic wasmysql.shop.ordersis now published toorders.orders. (You'd usually configure the regex more carefully to avoid the duplicate.) - Each transform sees the output of the previous one. If
cast_amountfailed (non-numeric value), the chain would either error the task or send the record to the dead-letter queue (depending onerrors.tolerance).
Output (final record).
{
"id": 7,
"email": "****",
"amount": 42
}
// topic: orders.orders (was: mysql.shop.orders)
Rule of thumb. Keep SMT chains short (≤ 5 transforms) and well-named. The alias names appear in worker logs and metrics — descriptive aliases (mask_email, cast_amount) pay back the day a transform fails in production.
Worked example — predicate-gated SMT for conditional transforms
Detailed explanation. A predicate lets you run a transform only when a condition is true. Pair Filter$Value with RecordIsTombstone to drop tombstones on a topic where they would corrupt downstream loaders; pair MaskField with TopicNameMatches to mask PII only on tables that hold it.
Question. Apply MaskField to redact the ssn column only when the record's topic name matches users.* — leave records on other topics untouched.
Input.
| topic | id | ssn |
|---|---|---|
| users.profiles | 7 | 123-45-6789 |
| events.clicks | 999 | (none — column does not exist) |
Code.
{
"transforms": "mask_ssn",
"transforms.mask_ssn.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask_ssn.fields": "ssn",
"transforms.mask_ssn.replacement": "[REDACTED]",
"transforms.mask_ssn.predicate": "is_users_topic",
"predicates": "is_users_topic",
"predicates.is_users_topic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.is_users_topic.pattern": "users\\..*"
}
Step-by-step explanation.
- The
predicatestop-level list declares predicate aliases — independent named filters. -
predicates.<alias>.typeselects the predicate class (TopicNameMatches,RecordIsTombstone,HasHeaderKey). - On any transform,
transforms.<alias>.predicate = <predicate-alias>makes that transform run only when the predicate is true. Addtransforms.<alias>.negate = trueto invert. - For our case,
mask_ssnonly runs when the record's topic name matchesusers\..*. Records onevents.clicksflow through unchanged. - Predicates do not drop records; they just gate the transform. To drop records, combine a predicate with the
Filter$Valuetransform (which drops records when the predicate is true).
Output.
| topic | id | ssn (after) |
|---|---|---|
| users.profiles | 7 | [REDACTED] |
| events.clicks | 999 | (unchanged — column missing or NULL, not redacted) |
Rule of thumb. Use predicates for "apply transform X only when condition Y." When the goal is to drop records, pair the predicate with Filter$Value. Predicates also keep the SMT chain testable — you can reason about each branch independently.
Kafka interview question on the SMT contract
A senior interviewer often asks: "What can an SMT do, what can it not do, and what would you reach for instead in the second case?"
Solution Using the per-record, no-state, boundary-only framing
What an SMT CAN do (stock or custom):
• Reshape a single record:
– flatten (ExtractField, Debezium ExtractNewRecordState).
– cast types (Cast).
– rename / drop fields (ReplaceField).
– mask values (MaskField).
– insert metadata (InsertField).
• Rewrite the destination topic name (RegexRouter, custom Transformation).
• Filter records out (Filter + Predicate).
What an SMT CAN'T do:
• Join two records — no multi-record state. Use Kafka Streams or
ksqlDB.
• Window or aggregate (sums, counts) — no state. Use Streams /
Flink / ksqlDB.
• Look up an external system per record cheaply — possible but
expensive and synchronous; reach for an enrichment step in Streams
instead.
• Reorder records — runs strictly in arrival order.
When I need state, I escape upward:
• Kafka Streams: code-defined DSL with state stores, joins, windows.
• ksqlDB: SQL over the same engine — same capabilities, less code.
• Flink: when the state or windowing needs go beyond Streams.
Boundary rule:
• Source SMT: between source.poll() and the write to Kafka.
• Sink SMT: between the consume from Kafka and sink.put().
• Both run in the worker process — zero hop.
Step-by-step trace.
| Want | Use |
|---|---|
| Flatten Debezium envelope | SMT (ExtractNewRecordState) |
| Cast a string column to int | SMT (Cast) |
| Mask an email | SMT (MaskField) |
| Drop tombstones | SMT (Filter + RecordIsTombstone) |
| Compute a 5-minute moving average | Streams / ksqlDB / Flink |
| Join orders to customers (multi-record) | Streams / ksqlDB |
| Look up city from zipcode (small ref) | enrichment in Streams or ksqlDB JOIN |
| Per-record audit log | sink SMT writes to a log topic via a custom Transformation |
After the framing, the candidate slots every concrete ask into one of two boxes — "SMT can do it" or "I'd promote it to Streams / ksqlDB / Flink" — and answers in seconds rather than guessing.
Output:
| Operation | Where it runs |
|---|---|
| Single-record reshape | SMT chain (transforms=) |
| Stateful or multi-record | Kafka Streams / ksqlDB / Flink |
| Cross-topic join | Streams (KStream-KTable) / ksqlDB JOIN / Flink |
| External lookup per record | Streams or ksqlDB enrichment, not SMT |
Why this works — concept by concept:
- Per-record contract — SMTs receive one record and return one record (or null). Anything that needs more inputs needs a different engine.
- No state — SMTs do not own a state store. The framework intentionally rejects designs that try to keep counters or windows inside a transform.
- Boundary-only execution — runs in the worker JVM, same thread as the connector. Zero network hop, zero serialisation cost beyond the chain.
-
Order in
transforms=— the chain executes left to right. Each transform sees the previous one's output. Misordering produces silent reshape bugs (e.g. masking after extracting from envelope means you never see the field to mask). - Cost — microseconds per record per transform. The SMT chain is the cheapest place to enforce typing, redaction, and routing. State and multi-record work belong upstream, in Streams or ksqlDB.
Streaming
Topic — streaming · python
Streaming with Python problems
5. Schema Registry + idempotent and exactly-once delivery
schema registry decouples producers and consumers via versioned schemas; kafka connect idempotent plus transactional sinks promote at-least-once to exactly-once
The mental model in one line: Schema Registry is the central authority for the wire format — Avro, Protobuf, or JSON Schema — and enforces a compatibility ladder (BACKWARD / FORWARD / FULL / NONE) at registration time; Connect's default delivery is at-least-once, and you opt in to exactly-once by enabling the idempotent producer on source connectors and transactional commits on sinks that support them. Once you can recite "registry owns the schema, idempotent stops duplicates, transactional ties commits to writes," the whole kafka connect idempotent + Schema Registry interview surface resolves.
Schema Registry — what it is.
-
A REST service that stores schemas keyed by
(subject, version). Each schema also gets a globally uniqueid. - The wire format prefixes every value with a magic byte plus the 4-byte schema id. Consumers fetch the schema by id on first encounter and cache it.
-
Subject naming strategies —
TopicNameStrategy(subject =<topic>-key/<topic>-value),RecordNameStrategy,TopicRecordNameStrategy. Default is TopicName. - Three serialisation formats supported — Avro (most common), Protobuf, JSON Schema.
Compatibility levels — the ladder.
| Level | Producer | Consumer | What you can change |
|---|---|---|---|
| BACKWARD (default) | new schema | old schema reads new data | add optional fields with defaults; delete optional fields |
| FORWARD | old schema | new schema reads old data | delete optional fields; add new ones with defaults |
| FULL | both | both | only changes that satisfy BACKWARD ∧ FORWARD |
| NONE | (none enforced) | (none enforced) | anything — use at your own risk |
Schema evolution rules of thumb.
- Always add fields with a default value. Otherwise BACKWARD breaks (an old consumer reading a new record sees a missing required field).
- Never rename or retype a field. Retyping breaks both directions; renaming is treated as add + remove, which is two breaking changes.
-
Mark old fields as
optional(Protobuf) / give a default (Avro) before removing them, and run a deprecation window where producers stop writing them. - Use FULL compatibility for shared schemas between teams that release independently. Use BACKWARD for "the consumer can lag the producer."
Connect's default delivery semantics.
- Default = at-least-once. Source: offsets are committed after the records are durably in Kafka, so a worker crash before commit re-reads from the last committed offset → possible duplicates. Sink: similar — flush before offset commit.
- At-most-once is not available out of the box; you would have to intentionally commit offsets before the write completes (rarely useful).
- Exactly-once is opt-in and requires two pieces: the idempotent producer on the source side and transactional reads on the sink side. Both rely on Kafka's transaction protocol (introduced in 0.11).
The idempotent producer — what it actually does.
-
enable.idempotence = true— Kafka 3.0+ default. Assigns each producer a producer id; each(producer id, partition, sequence)triple is unique. Retries of the same record are deduped on the broker. -
acks = all— every record waits for replication to all in-sync replicas before being acknowledged. Required for idempotence. -
max.in.flight.requests.per.connection ≤ 5— required to preserve ordering with retries enabled. Kafka 3.0+ default. -
retries— set to high (MAX_INT) so transient failures retry indefinitely; idempotence makes that safe.
Exactly-once on the sink side.
-
Connect's
exactly.once.support = required(worker-level, 3.3+) enables source-side exactly-once: source records are written transactionally, and the offset commit is part of the same transaction. A crash mid-batch rolls back both. - Sink connectors can achieve exactly-once into the destination only if the destination is idempotent (key-keyed upserts) or supports transactional writes (Snowflake's Snowpipe Streaming, JDBC with deduplication keys, S3 by using a record-key-based filename).
-
isolation.level = read_committedon the consumer side of the sink — ensures the sink only reads records that have been transactionally committed by the source, not records still in flight inside an open transaction.
Common interview probes.
- "What does Schema Registry actually store?" — schemas keyed by
(subject, version), with a globally uniqueidper schema; the wire format references the id, not the schema text. - "What is BACKWARD compatibility?" — the new schema can read data written by the old schema (or equivalently, an old consumer can still read new data). Default in Connect.
- "How do you remove a field safely?" — for BACKWARD: the field must be optional (or have a default in the old schema) so the old consumer can read the new data without it.
- "Does Kafka Connect support exactly-once?" — yes for source-side as of 3.3 with
exactly.once.support = required; sink-side exactly-once depends on the destination's idempotence or transactional support. - "What's the cost of idempotent producer?" — small. Some sequence-tracking metadata per producer;
acks = allmay slightly raise latency.
Worked example — register an Avro schema with BACKWARD compatibility
Detailed explanation. The first time a producer publishes a record, the Avro converter checks Schema Registry: does the schema already exist for this subject? If yes, get the id and write the wire prefix. If no, register the schema (which triggers a compatibility check against the previous version).
Question. Register the v1 schema for the orders-value subject. Then attempt to register v2 (adds an optional currency field with default "USD") and v3 (renames amount to amt). Show which succeed and which fail under BACKWARD compatibility.
Input — v1.
{
"type": "record",
"name": "Order",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "amount", "type": "double" }
]
}
Input — v2 (adds optional currency).
{
"type": "record",
"name": "Order",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "amount", "type": "double" },
{ "name": "currency", "type": "string", "default": "USD" }
]
}
Input — v3 (renames amount → amt).
{
"type": "record",
"name": "Order",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "amt", "type": "double" },
{ "name": "currency", "type": "string", "default": "USD" }
]
}
Code.
# Set BACKWARD compatibility (default — but explicit is good)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility":"BACKWARD"}' \
http://schema-registry:8081/config/orders-value
# v1 — first registration succeeds
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema":"<v1 schema JSON, escaped>"}' \
http://schema-registry:8081/subjects/orders-value/versions
# v2 — succeeds (optional field with default is BACKWARD-compatible)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema":"<v2 schema JSON, escaped>"}' \
http://schema-registry:8081/subjects/orders-value/versions
# v3 — FAILS (rename = drop + add of a required field)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema":"<v3 schema JSON, escaped>"}' \
http://schema-registry:8081/subjects/orders-value/versions
Step-by-step explanation.
- v1 is the first version — no compatibility check, it is simply stored as version 1 with a new id.
- v2 adds an optional field with a default. BACKWARD says "the new schema can read old data" — when a new consumer reads an old record, it uses the default for the missing
currency. Compatible. Registered as version 2. - v3 renames
amounttoamt. Avro sees this as "v3 has a requiredamtfield with no default; v2 has noamtat all." When a v3 consumer reads a v2 record, it cannot findamtand there is no default — read fails. BACKWARD breaks → registration rejected with HTTP 409. - The fix for v3 is to keep
amountand addamtas an alias, or to introduceamtwith a default that mirrorsamountand migrate downstream consumers before retiringamount.
Output.
| Version | BACKWARD check | Result |
|---|---|---|
| v1 | n/a (first) | registered, id = 101 |
| v2 (add optional with default) | pass | registered, id = 102 |
| v3 (rename without alias) | fail | HTTP 409, registration rejected |
Rule of thumb. Run the BACKWARD compatibility check in CI on every schema change before any producer is updated. The Schema Registry REST API has a POST /compatibility/subjects/<subject>/versions/latest endpoint that returns just "is this change compatible?" — perfect for a pre-commit hook.
Worked example — enable the idempotent producer on a source connector
Detailed explanation. Connect's source producer is a regular Kafka producer. The idempotent settings are the same as for any producer; you set them in the connector config under the producer.override.* prefix.
Question. Configure a source connector with the idempotent producer enabled and acks = all. Show the resulting producer behaviour after a worker network blip.
Input.
| setting | value |
|---|---|
| producer.override.enable.idempotence | true |
| producer.override.acks | all |
| producer.override.max.in.flight.requests.per.connection | 5 |
| producer.override.retries | 2147483647 |
Code.
{
"name": "mysql-shop-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.server.name": "shop",
"tasks.max": "1",
"producer.override.enable.idempotence": "true",
"producer.override.acks": "all",
"producer.override.max.in.flight.requests.per.connection": "5",
"producer.override.retries": "2147483647",
"producer.override.delivery.timeout.ms": "120000"
}
}
Step-by-step explanation.
-
enable.idempotence = truemakes the producer assign a producer id (PID) on first send. Every record gets a(PID, partition, sequence)triple. - The broker maintains the last sequence per
(PID, partition). Duplicate sequences (from retries after a network blip) are silently discarded — the record is only appended once. -
acks = allensures the broker waits for replication to every in-sync replica before acknowledging. Required for idempotence to give a meaningful "no duplicates" guarantee. -
max.in.flight.requests.per.connection = 5plus idempotence keeps records ordered even with retries (the broker uses the sequence to detect out-of-order or duplicate records). - After a transient broker reconnection, the source connector retries the in-flight batch. The broker dedupes by sequence. Net effect: no duplicates on the Kafka topic.
Output.
| Event | Without idempotence | With idempotence |
|---|---|---|
| Record A sent, ack lost | A reappears on retry → duplicate | A discarded on retry, no duplicate |
| Records A, B, C reordered by retry | possible | impossible (sequence enforces order) |
| Connection lost mid-batch | partial duplicates | clean replay, no duplicates |
Rule of thumb. Always enable idempotence on the source-side producer for any Connect source. As of Kafka 3.0, this is the default — but Connect connectors created before 3.0 inherit older defaults, so override explicitly in every connector config.
Worked example — sink-side exactly-once with read_committed
Detailed explanation. A transactional source writes records inside Kafka transactions. By default, consumers see in-flight transactional records (which may roll back). Setting isolation.level = read_committed makes the sink consumer skip uncommitted records, so the sink never writes records that the source later rolled back.
Question. Configure a JDBC sink so that it sees only committed transactional records from a transactional source, and so that the sink's UPSERTs are themselves idempotent.
Input.
| setting | value |
|---|---|
| consumer.override.isolation.level | read_committed |
| insert.mode | upsert |
| pk.mode | record_key |
| max.retries | 10 |
| retry.backoff.ms | 5000 |
Code.
{
"name": "pg-orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "shop.shop.orders",
"connection.url": "jdbc:postgresql://warehouse:5432/dwh",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"consumer.override.isolation.level": "read_committed",
"consumer.override.enable.auto.commit": "false",
"max.retries": "10",
"retry.backoff.ms": "5000",
"tasks.max": "3"
}
}
Step-by-step explanation.
- The sink subscribes to the source's topic. Without
isolation.level = read_committed, the consumer would see records the source is still considering — if the source's transaction aborts, the sink would have already written them. - With
read_committed, the consumer only fetches records whose containing transaction has committed. Aborted transactions never reach the sink. - The sink's
insert.mode = upsertmakes the destination idempotent at the row level: re-applying the sameINSERT ... ON CONFLICT DO UPDATEis a no-op when the values match. - Offset commit is gated on the JDBC batch commit (
enable.auto.commit = false). A crash between the JDBC commit and the Kafka offset commit triggers at-most-once redelivery on restart — the upsert mode makes that safe. - The combination —
read_committed+ idempotent destination — gives effective exactly-once into the warehouse, even though Connect's sink contract is technically at-least-once.
Output.
| Scenario | At-least-once sink (default) | Exactly-once with read_committed + upsert |
|---|---|---|
| Source rolls back transaction T1 | T1 records already in warehouse | T1 records never reach warehouse |
| Sink crashes after JDBC commit, before Kafka commit | re-applies T2 records → safe due to upsert | re-applies T2 records → still safe |
| Net effect | possible row-level duplicates | every row at most once |
Rule of thumb. For any sink consuming a transactional source, set isolation.level = read_committed and make the destination idempotent (upsert, MERGE, or vendor-native dedup). These two settings together close the door on aborted-transaction leakage and crash-mid-write duplicates.
Kafka interview question on schema evolution + exactly-once
A senior interviewer often combines: "You ship a Debezium → Kafka → Snowflake pipeline. A team wants to add an optional currency field to the orders schema. You also need exactly-once delivery so finance doesn't reconcile duplicate rows. Walk me through the design."
Solution Using BACKWARD-compatible schema + idempotent producer + Snowpipe Streaming sink
[
{
"name": "mysql-shop-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.name": "shop",
"table.include.list": "shop.orders",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.auto.register.schemas": "false",
"value.converter.use.latest.version": "true",
"producer.override.enable.idempotence": "true",
"producer.override.acks": "all",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
},
{
"name": "snowflake-orders-sink",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "shop.shop.orders",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"consumer.override.isolation.level": "read_committed",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
]
Step-by-step trace.
| Stage | What runs | Why |
|---|---|---|
| 1. Schema change | Register Avro v2 with optional currency, default "USD"
|
BACKWARD-compatible — old consumers still read |
| 2. Producer | Debezium emits v2 records |
auto.register.schemas = false forces explicit registration via CI |
| 3. Wire | Idempotent producer with acks = all
|
No duplicates on retry |
| 4. Transaction | Source writes within a Kafka transaction | Aborted transactions are filtered downstream |
| 5. Consumer | Snowpipe Streaming sink with read_committed
|
Skips uncommitted records |
| 6. Destination | Snowpipe Streaming with idempotent client | Exactly-once into Snowflake |
After the design, the team adds a CI hook that POSTs every Avro schema change to Schema Registry's compatibility endpoint and fails the PR if BACKWARD is broken. No producer can ship a schema that breaks the consumer.
Output:
| Concern | Answer |
|---|---|
| Schema change accepted | yes — optional field with default is BACKWARD-safe |
| Old consumer reads new record | yes — uses the default for currency
|
| Producer retries duplicate records | no — idempotent producer dedupes by sequence |
| Aborted transaction leaks to sink | no — read_committed filters them |
| Crash mid-write to Snowflake | safe — Snowpipe Streaming is idempotent |
Why this works — concept by concept:
- BACKWARD compatibility — the contract that new schemas can be read by old consumers. The Registry rejects breaking changes at registration time, so a producer can never ship a schema that breaks the downstream.
- Optional field with default — the canonical "safe additive change." Old consumers see the default; new consumers see the actual value when present.
-
Idempotent producer —
enable.idempotence = trueplusacks = allgive "no duplicates on retry" with a producer-id + sequence triple. Kafka 3.0+ default. -
Transactional source +
read_committedsink — together they ensure the sink only writes records that the source actually committed. Aborted transactions disappear cleanly. - Snowpipe Streaming idempotent client — vendor-native API that promises exactly-once into Snowflake when used with the Connect sink's idempotent ingest mode.
- Cost — idempotence is essentially free. Transactional source costs a small per-batch overhead. The Schema Registry adds one HTTP call per producer at startup (cached afterwards). Trade is overwhelmingly worth it for any pipeline that finance, billing, or compliance touches.
Streaming
Topic — streaming · medium
Streaming pipeline (medium difficulty)
Cheat sheet — Kafka Connect recipes
-
Bootstrap a distributed worker.
connect-distributed.sh config/connect-distributed.propertieson each node with the samegroup.id. Internal topics (connect-configs,connect-offsets,connect-statuses) auto-create on first start. -
Register a JDBC source connector.
POST /connectorswithconnector.class = io.confluent.connect.jdbc.JdbcSourceConnector+mode = timestamp+incrementing+timestamp.column.name = updated_at+incrementing.column.name = id+topic.prefix = pg.. -
Register a Debezium MySQL source.
connector.class = io.debezium.connector.mysql.MySqlConnector+database.server.name = shop+table.include.list = shop.orders+schema.history.internal.kafka.bootstrap.servers = kafka:9092. Single task per database. -
Register an S3 sink.
connector.class = io.confluent.connect.s3.S3SinkConnector+format.class = ParquetFormat+partitioner.class = TimeBasedPartitioner+flush.size = 1000+rotate.interval.ms = 300000. -
Register a JDBC sink (upsert).
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector+insert.mode = upsert+pk.mode = record_value+pk.fields = id+auto.create = false(in prod). -
Chain SMTs.
transforms = unwrap,cast,mask,routeplus per-aliastransforms.<alias>.type = ...andtransforms.<alias>.<field> = .... Order is execution order. -
Flatten a Debezium envelope.
transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState+drop.tombstones = false+delete.handling.mode = rewrite+add.fields = op,source.ts_ms. -
Configure Schema Registry compatibility.
PUT /config/<subject> {"compatibility":"BACKWARD"}. Use BACKWARD for additive evolution, FULL for shared schemas between independent teams. -
Enable idempotent producer on a source.
producer.override.enable.idempotence = true+producer.override.acks = all+producer.override.max.in.flight.requests.per.connection = 5. -
Enable read_committed on a sink.
consumer.override.isolation.level = read_committedto skip aborted transactional records. -
Monitor task status.
GET /connectors/:name/statusreturns the connector and each task's state plus stack trace on failure. Wire into your alerting onstate = FAILED. -
Restart a single failed task.
POST /connectors/:name/tasks/:id/restart. Far cheaper than restarting the whole connector. -
Restart the connector + every task.
POST /connectors/:name/restart?includeTasks=true. Use only for cluster-level corruption. -
Dead-letter queue for bad records.
errors.tolerance = all+errors.deadletterqueue.topic.name = dlq.<connector>+errors.deadletterqueue.topic.replication.factor = 3. Lets the connector skip records that fail SMTs or the converter.
Frequently asked questions
What is Kafka Connect and why use it over a custom consumer?
Kafka Connect is a distributed framework for moving data between Kafka and other systems via configuration. Instead of writing producer or consumer code, you submit a JSON connector config to a REST API, and the framework spawns parallel tasks that handle offset tracking, retries, schema management, and rebalance on failure. The win is twofold: you delete entire categories of bespoke ingestion code (every source / sink connector ships and is reused across the industry), and you get distributed-mode HA for free. Reach for a custom consumer only when no connector exists and the operational cost of writing one outweighs the cost of a 100-line custom Connector class. For "MySQL → S3," "Postgres → Snowflake," or "Kafka → Elasticsearch," Connect is the default answer.
What is the difference between source and sink connectors?
Source connectors read from an external system and write to Kafka; sink connectors read from Kafka and write to an external system. Both implement the same Connector interface and are deployed the same way, but they hit different parts of the framework: source tasks call poll() to emit SourceRecord batches that the worker writes to Kafka, while sink tasks receive SinkRecord batches via put(records) after the worker consumes them. Source offsets live in the connect-offsets topic; sink offsets live in the regular consumer-group offsets topic. Same framework, two contracts.
How does Debezium work and is it really log-based CDC?
Yes — Debezium reads the database's transaction log directly (MySQL binlog, Postgres WAL via logical replication slots, MongoDB oplog, SQL Server transaction log, Oracle redo log). Every committed change becomes a Kafka event with the before/after images plus metadata in a source envelope. Because it consumes the same log the database uses for replication, Debezium captures every change in commit order including DELETEs, never misses changes between polls, and survives connector restarts by resuming from the last persisted log position. JDBC source cannot do this — it can only see what its SELECT returns and is blind to deletes.
What are SMTs and when should I use them vs a stream processor?
SMTs (Single Message Transforms) are stateless per-record functions that run inside the Connect worker at the connector boundary — between poll() and the Kafka write (source) or between the consume and put(records) (sink). They reshape one record into one record: flatten a Debezium envelope (ExtractNewRecordState), cast a type (Cast), redact PII (MaskField), rename a topic (RegexRouter), drop a field (ReplaceField). Use them for record-local reshape. Reach for Kafka Streams, ksqlDB, or Flink the moment the transform needs multi-record state — joins, windows, aggregations, deduplication across keys. The Connect framework intentionally rejects designs that try to keep state inside an SMT.
Does Kafka Connect support exactly-once delivery?
Yes — source-side exactly-once is supported as of Kafka 3.3 by setting exactly.once.support = required at the worker level; the framework writes source records transactionally and includes the offset commit in the same transaction. Sink-side exactly-once depends on the destination: if the destination is idempotent (key-based upsert, JDBC MERGE, Snowflake Snowpipe Streaming) or supports transactional writes, you can pair consumer.override.isolation.level = read_committed with the idempotent producer on the source for effective end-to-end exactly-once. The default delivery semantic is at-least-once — practical for many pipelines, but always make it explicit which mode you ship.
What's the difference between standalone and distributed mode?
Standalone runs a single Connect process from a properties file — no REST API, no internal topics, no HA. Distributed runs a pool of worker JVMs that share the three internal topics (connect-configs, connect-offsets, connect-statuses) and form a cluster via a shared group.id. Distributed accepts config changes via REST, rebalances tasks across workers when one dies, and persists every piece of state in Kafka so workers can come and go without losing the pipeline. Standalone is for laptops, integration tests, and quick demos; distributed is the only mode you ever ship to production. Treat the two as different products.
Practice on PipeCode
- Drill the streaming practice library → for Kafka, Debezium, and Connect-style ingestion problems.
- Rehearse on ETL pipeline problems → when the interviewer wants source → buffer → sink reasoning.
- Sharpen streaming with Python drills → for producer / consumer code questions.
- Stack the streaming medium difficulty problems → once the basics feel automatic.
- Layer the ETL medium difficulty library → for the multi-stage pipeline-design questions.
- For the broader interview surface, read top data engineering interview questions →.
- Stack the prerequisites with the only 5 skills you need to become a data engineer →.
- Sharpen the system-design axis with the ETL system design for data engineering interviews course →.
- For Spark-side processing of Connect-fed topics, work through Apache Spark internals for data engineering interviews →.
Pipecode.ai is Leetcode for Data Engineering — every Kafka Connect recipe above ships with hands-on practice rooms where you write the Debezium config, the SMT chain, and the idempotent-sink JSON against real graded inputs. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to wonder whether your `kafka connect` design actually behaves the same on a single-worker laptop as it does on a multi-node production cluster.





Top comments (0)