Recently, while using SeaTunnel CDC to synchronize real-time data from Oracle, MySQL, and SQL Server to other relational databases, I spent time reading and modifying the source code of SeaTunnel and Debezium. Through this process, I gained an initial understanding of how the SeaTunnel CDC Source is implemented.
While everything was still fresh, I decided to organize some of the questions I encountered and address common confusions. I will try to explain things in a more approachable way. These are purely my personal understandings—if anything is incorrect, I welcome corrections.
The main topics covered in this article are:
- The stages of CDC: snapshot, backfill, and incremental
- How
startup.mode = timestampis implemented internally- How SeaTunnel’s Checkpoint mechanism interacts with CDC tasks
- Why Checkpoints keep timing out
1. The Stages of CDC: Snapshot, Backfill, and Incremental
The overall CDC data reading process can be divided into three phases:
Snapshot (full) → Backfill → Incremental
Snapshot Phase
As the name implies, the snapshot phase captures a snapshot of the current state of the database and reads all existing data. In SeaTunnel’s current implementation, this is done through pure JDBC reads.
During snapshot reading, SeaTunnel records the current binlog position. For MySQL, it executes:
SHOW MASTER STATUS;
which returns results such as:
File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set
-------------+-----------+--------------+------------------+------------------
binlog.000011|1001373553 | | |
These values are stored as the low watermark.
Note that this operation is not performed only once.
To improve performance, SeaTunnel has designed its own split mechanism. You can refer to my other article for details. Assume the global parallelism is 10. SeaTunnel initializes 10 channels to execute tasks in parallel.
SeaTunnel first analyzes the number of tables, then splits each table based on the minimum and maximum values of the primary key. The default split size is 8096 rows.
For tables with large data volumes, this can result in more than 100 splits, which are randomly distributed across the 10 channels. At this stage, no data is actually read; SeaTunnel only prepares the SQL queries with WHERE conditions and stores them.
After all tables are split, each split is executed in parallel.
When each split (for example,
SELECT * FROM user_orders WHERE order_id >= 1 AND order_id < 10001) begins execution:
- SeaTunnel records the current binlog position as the low watermark for that split.
- After the split finishes reading data, SeaTunnel executes
SHOW MASTER STATUSagain. - The returned position is recorded as the high watermark for that split.
Once one split finishes, the next split begins execution.
The corresponding code is shown below:
// MySqlSnapshotSplitReadTask.doExecute()
protected SnapshotResult doExecute(...) {
// ① Record low watermark
BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
dispatcher.dispatchWatermarkEvent(..., lowWatermark, WatermarkKind.LOW);
// ② Read snapshot data
createDataEvents(ctx, snapshotSplit.getTableId());
// ③ Record high watermark
BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
dispatcher.dispatchWatermarkEvent(..., highWatermark, WatermarkKind.HIGH);
}
Notes:
It is recommended to configure a larger split size, such as 100,000 rows. Practical experience shows that more splits do not necessarily lead to better performance.
Backfill Phase
The backfill phase has two modes, controlled by the exactly_once parameter.
-
exactly_once = false(default)
If exactly_once is disabled, SeaTunnel waits until all snapshot splits are completed. It then compares the watermarks of all splits and selects the minimum watermark.
From that point onward, SeaTunnel switches from JDBC reads to CDC log consumption:
- MySQL: binlog
- Oracle: redo log
- SQL Server: CDC log
Log entries are parsed, and corresponding INSERT, UPDATE, or DELETE events are generated.
Each emitted record carries its own position or SCN offset. For each incoming record, SeaTunnel compares its offset with the high watermark. Once the offset exceeds the high watermark, the system transitions into the pure incremental phase.
-
exactly_once = true
When exactly_once is enabled:
- Snapshot data for each split is not written immediately but cached in memory.
-
SeaTunnel starts a bounded log reading task:
- Start offset: the split’s low watermark
- End offset: the split’s high watermark
All change events within this range are parsed and cached.
Snapshot data and log data are then merged in memory. Records are compared by primary key. For example, if a row is inserted during snapshot and later updated during log reading, only the updated version is retained.
This guarantees exactly-once semantics, but it is very memory-intensive.
Incremental Phase
The incremental phase consists of pure log consumption.
If exactly_once is enabled, SeaTunnel starts a new unbounded stream beginning from the high watermark. If it is disabled, incremental reading continues directly from the backfill phase.
Conceptually, backfill reads from the low watermark, while incremental reading starts from the high watermark. The only difference lies in the starting offset.
Summary
1. Two Execution Modes
With exactly_once enabled (Exactly-Once)
- Snapshot: reads full historical data at the low watermark
- Backfill: fills changes between the low and high watermarks
- Incremental: consumes the unbounded stream after the high watermark
Costs:
- More state to maintain
- High memory pressure, especially with many tables and splits
Note:
The machanism of LogMiner
LogMiner is an internal Oracle process running inside the production database instance.
-
Each LogMiner session requires:
- ~1 CPU core for parsing Redo Logs
- ~500MB–1GB memory for caching and parsing
- Continuous reading of Redo Log files (I/O operations)
Without enabling exactly_once (semantic: At-Least-Once)
- Snapshot: still reading historical data
- Incremental: directly consume binlog from the low watermark (no separate backfill)
-
Differences:
- No separate "backfill" step
- Snapshot and incremental are in the same stream, but not mixed:
- Complete all snapshot splits first
- Then switch to incremental consumption (from low watermark)
Backfill and incremental are linked in the same stream, which can be considered as "backfill + incremental as one".
2. Will duplicates occur if exactly_once is not enabled?
Yes, under certain conditions:
-
Source side uses table/block split parallel SELECT:
- For whole-database or multi-table scenarios with low parallelism → many SELECT blocks will be queued and delayed.
-
During queuing, if new data is inserted:
- Some blocks’ snapshot SELECT may already see the new data
- Subsequent incremental binlog will also read it again
Result: the same row is written twice to the Sink (typical At-Least-Once behavior)
3. How to minimize duplicate writes without exactly_once?
-
Solution: enable
upsert on the Sink (primary key idempotent)- JDBC Sink with
enable_upsert → uses statements likeMERGE INTO /REPLACE INTO - Use upsert for both snapshot and incremental phases:
- Repeated primary keys overwrite previous values, final table contains only one row per key
- Semantically: transport is At-Least-Once, downstream result approximates logical Exactly-Once
- JDBC Sink with
-
Cost:
- Snapshot phase also uses upsert → slower than plain
INSERT - If forcing
exactly_once+ in-memory filtering: - Many split blocks require tracking large amounts of offsets/primary keys in memory → high memory pressure
- For Oracle (LogMiner-based source):
- Each block starts independent LogMiner/streaming session → high intrusion on production DB, increased latency
- Snapshot phase also uses upsert → slower than plain
4. Practical Recommendations
-
Maximize performance, accept "at-least-once" + idempotent:
- Disable
exactly_once, enable Sink upsert, deduplicate by primary key
- Disable
-
Few tables, manageable data, strict Exactly-Once requirement:
- Consider enabling
exactly_once, but evaluate memory and source DB pressure (especially Oracle LogMiner scenarios)
- Consider enabling
2. How is CDC startup.mode timestamp implemented internally?
The timestamp mode specifies a point in time to sync data. Each database’s CDC mechanism differs, so the way to specify the timestamp is also different.
1. MySQL – Binary Search on Binlog Files
MySQL principle:
- User specifies millisecond timestamp (e.g.,
1734494400000) - SeaTunnel executes
SHOW BINARY LOGSto get all binlog files - Performs binary search on binlog files, reading the timestamp of the first record in each file
- Finds the first binlog file where timestamp >= specified time
- Returns the filename and position 0, reads binlog from that position
2. Oracle – TIMESTAMP_TO_SCN Function
Oracle principle:
- User specifies millisecond timestamp (e.g.,
1763058616003) - SeaTunnel converts it to
java.sql.Timestamp, formatted asYYYY-MM-DD HH24:MI:SS.FF3 - Executes SQL:
SELECT TIMESTAMP_TO_SCN(TO_TIMESTAMP('2024-12-18 09:00:00.003', 'YYYY-MM-DD HH24:MI:SS.FF3')) FROM DUAL - Oracle built-in function
TIMESTAMP_TO_SCNreturns corresponding SCN (System Change Number) - Returns
RedoLogOffsetcontaining that SCN, reads redo log from that SCN - SCN can also be converted back to timestamp:
SELECT current_scn FROM v$database; andSELECT SCN_TO_TIMESTAMP('240158979') FROM DUAL;
Additionally, since Oracle reads redo logs directly, troubleshooting is difficult. The following SQL simulates Debezium starting a LogMiner session, useful for problem diagnosis:
-- Clean previous LogMiner session
BEGIN
DBMS_LOGMNR.END_LOGMNR;
EXCEPTION
WHEN OTHERS THEN NULL;
END;
SELECT * FROM V$LOGFILE ;
-- Add current online log files
DECLARE
v_first BOOLEAN := TRUE;
BEGIN
FOR rec IN (SELECT MEMBER FROM V$LOGFILE WHERE TYPE='ONLINE' AND ROWNUM <= 3) LOOP
IF v_first THEN
DBMS_LOGMNR.ADD_LOGFILE(rec.MEMBER, DBMS_LOGMNR.NEW);
DBMS_OUTPUT.PUT_LINE('Added log file: ' || rec.MEMBER);
v_first := FALSE;
ELSE
DBMS_LOGMNR.ADD_LOGFILE(rec.MEMBER, DBMS_LOGMNR.ADDFILE);
DBMS_OUTPUT.PUT_LINE('Added log file: ' || rec.MEMBER);
END IF;
END LOOP;
END;
-- Start LogMiner session
BEGIN
DBMS_LOGMNR.START_LOGMNR(
OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.COMMITTED_DATA_ONLY
);
DBMS_OUTPUT.PUT_LINE('LogMiner started successfully');
END;
-- Query parsed content
SELECT
SCN,
OPERATION,
OPERATION_CODE,
TABLE_NAME,
TO_CHAR(TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS') AS TIMESTAMP,
CSF,
INFO,
SUBSTR(SQL_REDO, 1, 200) AS SQL_REDO_PREVIEW
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'XML_DEBUG_TEST'
AND SEG_OWNER = USER
ORDER BY SCN, SEQUENCE#;
-- Clean LogMiner session
BEGIN
DBMS_LOGMNR.END_LOGMNR;
EXCEPTION
WHEN OTHERS THEN NULL;
END;
3. PostgreSQL – Timestamp Not Supported
PostgreSQL principle:
- Does not support timestamp mode
- Uses LSN (Log Sequence Number) as offset
- LSN is a 64-bit number representing WAL (Write-Ahead Log) position
- No direct function to convert timestamp to LSN
- Users can only use
INITIAL,EARLIEST,LATESTmodes
4. SQL Server – sys.fn_cdc_map_time_to_lsn Function
SQL Server principle:
- User specifies millisecond timestamp (e.g.,
1734494400000) - SeaTunnel converts to
java.sql.Timestamp - Executes SQL:
SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', ?) AS lsn - SQL Server built-in function returns the smallest LSN >= specified time
- Returns
LsnOffsetcontaining byte array for that LSN, reads CDC log from that LSN
3. How SeaTunnel Checkpoint Mechanism Interacts with CDC Tasks
Checkpoint enables resume-from-failure. How does it work, and what should be noted?
First, understand the CK implementation: SeaTunnel triggers a checkpoint asynchronously at intervals: SourceFlowLifeCycle.triggerBarrier()
// SourceFlowLifeCycle.triggerBarrier()
public void triggerBarrier(Barrier barrier) throws Exception {
log.debug("source trigger barrier [{}]", barrier);
// Key: acquire checkpoint lock to ensure state consistency
synchronized (collector.getCheckpointLock()) {
// Step 1: check if prepare to close
if (barrier.prepareClose(this.currentTaskLocation)) {
this.prepareClose = true;
}
// Step 2: snapshot state
if (barrier.snapshot()) {
List<byte[]> states = serializeStates(splitSerializer, reader.snapshotState(barrier.getId()));
runningTask.addState(barrier, ActionStateKey.of(sourceAction), states);
}
// Step 3: acknowledge barrier
runningTask.ack(barrier);
// Step 4: Key! send barrier as Record downstream
collector.sendRecordToNext(new Record<>(barrier));
}
}
The checkpoint simulates a barrier record, a special marker that passes through iterators along the data flow: source → transform → sink. At each stage, the Barrier is evaluated:
- Source stops reading and stores state in CK
- Transform passes Barrier without processing
- Sink flushes buffered batch data upon receiving Barrier
States Saved in Different Phases
Snapshot Phase
Saved content:
public class SnapshotSplit {
private final Object[] splitStart; // [1000]
private final Object[] splitEnd; // [2000]
private final Offset lowWatermark; // binlog.000011:1234
private final Offset highWatermark; // binlog.000011:5678
}
Restore logic:
Key code:
// IncrementalSourceReader.addSplits()
for (SourceSplitBase split : splits) {
if (split.isSnapshotSplit()) {
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(splitId, snapshotSplit); // completed, skip
} else {
unfinishedSplits.add(split); // not completed, read again
}
}
}
Incremental Phase
Saved content:
public class IncrementalSplit {
private final Offset startupOffset; // current Binlog position
private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos; // backfill state
private final Map<TableId, byte[]> historyTableChanges; // Debezium history
}
Restore logic:
// IncrementalSourceReader.initializedState()
if (split.isIncrementalSplit()) {
IncrementalSplit incrementalSplit = split.asIncrementalSplit();
// restore table schema
debeziumDeserializationSchema.restoreCheckpointProducedType(
incrementalSplit.getCheckpointTables()
);
// continue consuming from startupOffset
return new IncrementalSplitState(incrementalSplit);
}
Checkpoint State Comparison
| Phase | Saved Content | Restore Method | Duplicate Risk |
|---|---|---|---|
| Snapshot | Split range + Watermarks | Re-execute unfinished Splits | Yes (Sink must be idempotent; repeated Select may query different snapshot points) |
| Incremental | Binlog Offset + Table Schema | Continue from Offset | No |
Thus, it is recommended to avoid restoring or pausing tasks during full snapshot and backfill phases, as many unknown issues may arise.
4. Checkpoint Timeout
In practice, checkpoint (CK) timeout may occur even after 10–20 minutes. Why?
Analyzing CK and CDC tasks: long CK timeouts are usually due to insufficient write performance or misconfiguration on the Sink. Source only triggers CK to save minimal metadata quickly; the Sink must process all pending writes before the CK Barrier passes.
Checkpoint Timeout Mechanism Analysis
Checkpoint ensures Exactly-Once semantics; CK Barrier must propagate from source to sink, with all operators saving state.
- Source speed: only records read position and metadata (Split, Offset), usually milliseconds → fast CK trigger
- Sink blocking: must complete all pre-Barrier writes (e.g., 10,000 records)
- Timeout occurs: if Sink is slow → Barrier cannot pass within timeout (e.g., 10–20 min) → CK Timeout
Conclusion: prolonged timeout almost always means Sink cannot handle backlog in time.
Solutions
1. Optimize Sink (most common)
MySQL
# JDBC URL add batch rewrite parameter
jdbc:mysql://host:port/db?rewriteBatchedStatements=true&cachePrepStmts=true
Doris/StarRocks
# Use stream load mode with tuning parameters
sink {
Doris {
sink.enable-2pc = true
sink.buffer-size = 1048576
sink.buffer-count = 3
}
}
PostgreSQL
sink {
Jdbc {
# Use COPY mode instead of INSERT
use_copy_statement = true
}
}
2. Source-side throttling
env {
job.mode = STREAMING
# Limit read speed to give Sink time
read_limit.rows_per_second = 4000
read_limit.bytes_per_second = 7000000
# Increase checkpoint timeout
checkpoint.interval = 30000
checkpoint.timeout = 600000
}
Closing Remarks
CDC technology is indeed complex, involving many aspects of distributed systems: parallelism control, state management, fault tolerance, exactly-once semantics, and deep understanding of databases. SeaTunnel, building on Debezium, has implemented numerous engineering optimizations, fixed multiple bugs, and its architecture is friendly for newcomers. Whether fixing documentation or directly debugging code, it is relatively easy to get started.
We warmly welcome contributors to join the community!
This article aims to help you better understand SeaTunnel CDC’s internal mechanisms, reduce pitfalls in production, and improve tuning. Any questions or discovered errors are welcome for discussion and correction.
Finally, wishing all CDC tasks run stably without interruption, and checkpoints never time out again!












Top comments (0)