DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Designing a Scalable Event-Driven Data Lake with Real-Time Ingestion and Exactly-Once Semantics

Designing a Scalable Event-Driven Data Lake with Real-Time Ingestion and Exactly-Once Semantics

Designing a Scalable Event-Driven Data Lake with Real-Time Ingestion and Exactly-Once Semantics

In this tutorial, you’ll learn how to design a scalable data lake architecture that ingests streaming and batch data, provides real-time query capabilities, and guarantees exactly-once processing semantics. We’ll walk through the core components, data modeling choices, fault-tolerance strategies, and practical code examples you can adapt to real projects.

Overview and goals

  • Build a unified data lake that supports batch and streaming sources.
  • Achieve low-latency ingestion and near-real-time analytics.
  • Ensure exactly-once processing guarantees to avoid duplicate data.
  • Provide a clear separation of concerns: ingestion, processing, storage, catalog, and serving layers.
  • Include practical patterns for schema evolution, data quality, and observability.

Architecture overview

  • Ingestion layer
    • Streaming: Apache Kafka or equivalent message bus for real-time data streams.
    • Batch: Periodic file drops into object storage (e.g., S3-compatible storage, GCS, Azure Blob).
  • Processing layer
    • Stream processing: A fault-tolerant stream processor (e.g., Apache Flink, ksqlDB, or Spark Structured Streaming) to apply transformations, enrichments, and aggregation.
    • Batch processing: Spark jobs or similar to process large historical datasets and reprocess with updated logic.
  • Storage layer
    • Raw zone: Immutable, append-only data lake files (e.g., Parquet/ORC) in a partitioned structure.
    • Cleansed zone: Cleaned, conformed schema with standardized metadata.
    • Curated zone: Aggregated, feature-oriented datasets ready for analytics and ML.
  • Metadata and governance
    • Data catalog: Hive Metastore, Apache Iceberg, or AWS Glue Data Catalog to track schemas, partitions, and data lineage.
    • Schema registry: Avro/Schema Registry or Iceberg schema evolution features.
  • Serving/query layer
    • Data virtualization or query engines (e.g., Presto/Trino, Apache Spark SQL) or optimized BI connectors.
    • Materialized views or incremental aggregates for fast dashboards.
  • Orchestration and reliability
    • Orchestrator: Airflow, Dagster, or Prefect to manage batch jobs, with proper dependency graphs.
    • Exactly-once guarantees: Idempotent sinks, transactional writes where supported (e.g., Iceberg tables with transactional writes), and careful offset management in streaming.
  • Observability
    • Metrics: Prometheus-compatible metrics from processors, ingestion lag, failure rates.
    • Traces: Distributed tracing for end-to-end visibility (e.g., OpenTelemetry with Jaeger/Tempo).
    • Logging: Centralized logging with structured logs, error dashboards, and alerting.

Key design decisions
1) Data formats and schemas

  • Use columnar formats (Parquet/ORC) for efficiency and compatibility with analytics engines.
  • Store schemas in a schema registry or in the catalog to enforce compatibility.
  • Favor a canonical, evolving schema with:
    • Partition keys (e.g., event_date) for partition pruning.
    • Optional, nullable fields for backward compatibility.
  • Implement schema evolution strategy:
    • Add-only non-breaking changes (new fields with defaults) are safe.
    • Remove or rename fields should be avoided or require a migration plan.

2) Exactly-once semantics

  • In streaming ingestion, use idempotent producers and transactional sinks where available.
  • In Flink/Spark, leverage checkpointing and exactly-once guarantees across operators and sinks.
  • For file-based sinks:
    • Write to a staging area first, then commit atomically to the target dataset (e.g., through a transactional table or commit protocol like Apache Iceberg’s table snapshots).
  • For downstream systems:
    • Use deduplication keys and upsert semantics where appropriate.
    • Maintain a compact, durable deduplication store (e.g., a compacted key+timestamp store) to avoid reprocessing duplicates.

3) Data quality and lineage

  • Validate data against a strict schema and domain constraints during ingestion.
  • Include lineage metadata (source, timestamp, schema version) in each record or as table-level annotations.
  • Implement anomaly detection and alerting for data quality issues (e.g., missing fields, late data, schema drift).

4) Operational considerations

  • Partitioning strategy: time-based partitions (e.g., daily/hourly) with a consistent naming convention.
  • Backpressure handling: use buffering and backoff strategies in producers and processors.
  • Security and access control: least privilege with per-project scopes; encrypt data at rest and in transit.
  • Observability: keep dashboards for ingestion lag, processing latency, and error rates.

Step-by-step design and implementation

Phase 1: Define data products and sources

  • Identify event domains: user activity, transactions, sensor measurements, etc.
  • For each domain, list:
    • Source type (streaming, batch)
    • Key identifiers (user_id, order_id)
    • Event time vs. processing time
    • Required fields and optional fields

Phase 2: Ingestion blueprint

  • Streaming path (for real-time): Kafka topics per domain
    • Producers publish events with stable keys (e.g., user_id or event_id)
    • Event payload: Avro or JSON with a defined schema, including a monotonically increasing event_time
  • Batch path: data drops into object storage as partitioned files
    • Use a consistent folder structure: /domain/event_date=YYYY-MM-DD/...
  • Processing glue: a stream processor that consumes from Kafka and writes to the raw zone
    • Ensure exactly-once processing in the sink layer (e.g., write to Iceberg-managed Parquet files)
    • Enrich events (join reference data, lookups) in real time if needed

Code example: a minimal Flink streaming job with exactly-once semantics

  • Language: Java or Scala (below uses Java-like pseudocode for clarity)

  • Pseudocode highlights:

    • Create a Kafka source with exactly-once semantics
    • Deserialize Avro/JSON to a POJO
    • Enrich with a dimension table (via a broadcast state or external store)
    • Write to an Iceberg-backed Parquet sink with checkpointing and table-level transactions

// Pseudocode outline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 1 minute

Properties kafkaProps = new Properties();
// set bootstrap servers, group.id, etc.

DataStream events = env
.addSource(new FlinkKafkaConsumer<>("domain-events", new EventDeserializationSchema(), kafkaProps))
.assignTimestampsAndWatermarks(new EventTimeAssigner());

DataStream enriched = events
.keyBy(Event::getKey)
.process(new EnrichmentFunction(dimensionsBroadcastState));

enriched.addSink(new IcebergParquetSink("s3://lake/raw/domain-events"))
.setSemantic(Semantic.EXACTLY_ONCE);

env.execute("Domain Event Ingestion - Exactly-Once");

Notes:

  • EventDeserializationSchema handles Avro/JSON parsing with field validation.
  • EventTimeAssigner extracts event_time for watermarking and late data handling.
  • EnrichmentFunction uses a broadcast state for dimension lookups or external stores.
  • IcebergParquetSink writes to an Iceberg table in the raw zone with transactional commits.

Phase 3: Storage layout and catalog

  • Raw zone: /lake/raw/domain/date=YYYY-MM-DD/part-*.parquet
  • Cleansed zone: /lake/cleansed/domain/date=YYYY-MM-DD/...
  • Curated zone: /lake/curated/domain/aggregates/date=...

  • Iceberg usage:

    • Create Iceberg tables for raw domain data with schema and partition specs
    • Enable table-level transactions to support atomic commits
    • Use schema evolution features to safely add new fields

Code snippet: defining an Iceberg table in code (conceptual)

  • Example (conceptual): CREATE TABLE lake.raw.domain_events ( event_id STRING, domain STRING, user_id STRING, event_time TIMESTAMP, payload STRUCT<...> ) PARTITIONED BY (date(event_time));

Phase 4: Processing for batch and streaming

  • Real-time enrichment: join with reference data (e.g., user profile) using a fast cache or a small dimension store
  • Windowed aggregations: for dashboards, compute rolling metrics (e.g., 1h, 24h)
  • Batch reprocessing: periodically run Spark jobs to materialize cleaned/curated views and to reprocess past data when schema evolves

Phase 5: Serving layer and data access

  • Query engines: Presto/Trino or Spark SQL to run ad-hoc analytics
  • Materialized views: maintain summary tables for dashboards (e.g., total_events_by_domain per hour)
  • Data access controls: enforce per-user or per-project access policies at the catalog level
  • Data catalogs: register all tables with the data catalog, including schemas, partitions, and lineage metadata

Phase 6: Observability and reliability

  • Monitoring:
    • Ingest lag per topic
    • Throughput and CPU/memory usage of processors
    • Checkpoint success/failure rates
  • Tracing:
    • Propagate trace contexts through ingestion and processing
    • Use OpenTelemetry with a centralized backend
  • Alerting:
    • Notify on rising lag, failures, or schema drift
    • Create governance alerts for unusual data patterns

Phase 7: Schema evolution and data quality

  • Schema evolution:
    • Use additive schema changes (new fields with defaults)
    • When removing fields, consider a deprecation period and data migration plan
  • Data quality checks:
    • Enforce non-null constraints on critical fields
    • Validate event_time within acceptable windows
    • Track anomaly counts and trigger alerts on thresholds

Phase 8: Operational playbooks

  • New domain onboarding:
    • Define a standard event schema, topic naming, and partitioning scheme
    • Add tests for schema compatibility and ingestion end-to-end
  • Incident response:
    • Runbooks for ingestion failures, backpressure, and data drift
  • Rollback strategy:
    • Use immutable data in raw zone; rollbacks mean ignoring problematic partitions or applying a corrective job in the cleansed layer

Best practices and common pitfalls

  • Pitfall: High cardinality keys causing hot partitions
    • Mitigate by hashing keys and partitioning by time, not only by key
  • Pitfall: Late-arriving data breaking consumers
    • Use watermarking and late data handling; keep a grace period for late events
  • Pitfall: Schema drift breaking downstream joins
    • Maintain backward compatibility; avoid breaking changes; publish schema versions
  • Pitfall: Over-optimistic exactly-once guarantees
    • Ensure all sinks support idempotent writes and that checkpoints cover the critical flow, not just a subset

Illustrative example: end-to-end data product

- Domain: ecommerce transactions

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)