Data Ingestion 101: Building Robust Pipelines with CDC, Batch, and APIs 🛠️
Data ingestion is the "first gateway" of data engineering. The stability and efficiency of your ingestion layer directly determine the quality of all downstream processing and analytics.
In this guide, based on the open-source data_engineering_book, we’ll explore how to handle different data sources, choose the right ingestion patterns, and implement a real-time CDC pipeline.
1. Understanding Your Data Sources
We categorize data sources into two main dimensions: Form and Latency.
By Form
- Structured: Databases (MySQL, PostgreSQL), CSVs, or ERP exports with fixed schemas.
- Semi-Structured: JSON/XML logs, Kafka messages, and NoSQL (MongoDB). These require schema inference or flattening.
- Unstructured: PDFs, images, and audio/video files.
By Latency
- Batch (Offline): Daily/weekly reports or full database backups. High latency, but high data integrity.
- Streaming (Real-time): User clickstreams, payment logs, and DB change logs. Requires millisecond-level processing.
2. Core Ingestion Strategies
Based on the Data Engineering Book, there are three primary patterns:
A. CDC (Change Data Capture)
The gold standard for database synchronization. It captures row-level changes (Insert/Update/Delete) from database logs (like MySQL Binlog) without impacting the production application.
- Top Tool: Flink CDC (supports full + incremental sync).
B. Batch Ingestion
Standardized scheduled pulls for offline scenarios.
- Tools: DataX, Apache Sqoop, or even Python/Pandas for smaller datasets.
C. API Pulling
The go-to method for 3rd-party SaaS (Stripe, Shopify, TikTok Ads).
- Key Challenges: Handling OAuth2, pagination logic, and exponential backoff for rate limiting.
3. Hands-on: Real-time MySQL to Kafka Pipeline
Let's implement a real-time sync using Flink CDC. This setup captures every change in a MySQL table and streams it to Kafka as a JSON message.
The Code (Java/Flink)
public class MySql2KafkaCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Critical for preventing data loss
// 1. Configure MySQL CDC Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("production_db")
.tableList("production_db.orders")
.username("cdc_user")
.password("cdc_password")
.deserializer(new JsonDebeziumDeserializationSchema()) // Convert to JSON
.build();
// 2. Configure Kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("db_changes_orders")
.setValueSerializationSchema((element, context) -> element.getBytes())
.build())
.build();
// 3. Run the Pipeline
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-Source")
.sinkTo(kafkaSink);
env.execute("MySQL to Kafka Real-time Sync");
}
}
4. Common Pitfalls (And How to Avoid Them)
🚨 Data Loss
- Scenario: A Flink job restarts but doesn't have Checkpointing enabled, losing the current offset.
- Fix: Always enable persistent Checkpointing (S3/HDFS) and implement Idempotent Writes at the sink.
🐢 Data Lag
- Scenario: Binlog accumulation or insufficient Kafka partitions.
- Fix: Increase Flink parallelism and split synchronization for giant tables into separate jobs.
🧩 Schema Drift
-
Scenario: Upstream DB changes a column from
INTtoSTRING, breaking your pipeline. - Fix: Use Schema Validation tools (like Great Expectations) at the ingestion layer to catch mismatches early.
Conclusion
Ingestion is the first line of defense for your data system. Small leaks here become floods downstream.
For the full Docker-compose environment (MySQL + Kafka + Flink) and complete source code, head over to our repository:
👉 GitHub: datascale-ai/data_engineering_book
What's your preferred tool for data ingestion? Are you a Flink CDC fan or do you prefer Airbyte/Meltano? Let's discuss below! 👇

Top comments (0)