DEV Community

ESTHER NAISIMOI
ESTHER NAISIMOI

Posted on

Understanding Data in Data Engineering: A Comprehensive Guide

Table of Contents

  1. Introduction
  2. Types of Data
  3. Data Sources and Collection Methods
  4. Data Ingestion Tools and Strategies
  5. The Data Lifecycle
  6. Data Storage Systems
  7. Data Processing and Transformation
  8. Data Orchestration and Workflow Management
  9. Data Serving and Consumption Layers
  10. Data Governance and Security
  11. Data Archival and Retention
  12. Advanced Topics

Introduction

When people discuss "data," they rarely explain precisely what data is or measure its relevance. There are many underrated aspects of data that we might assume are merely "information" or "things that exist." In reality, data can be anything collected by hardware or software tools ,and understanding its nuances is fundamental to effective data engineering.

Data isn't just passive information sitting in a database. It's a carefully cultivated, collected, processed, and distributed asset. The journey from raw collection to actionable insights involves numerous specialized systems, architectural decisions, and trade-offs.

This guide dives deep into:

  • What data actually is in different forms and contexts
  • Where data originates and how it's collected
  • How data flows through modern architectures
  • Who consumes data and why each needs different representations
  • Technical implementation details that matter in production

Types of Data

Data can be categorized into three fundamental types based on structure and organization:

1. Structured Data

Definition: Data organized in predefined schemas with clear relationships and constraints.

Characteristics:

  • Organized in rows and columns (tabular format)
  • Queryable with SQL
  • Enforced data types and constraints
  • Predictable schema evolution
  • Typically stored in relational databases

Storage Systems:

  • PostgreSQL
  • MySQL
  • Oracle Database
  • SQL Server
  • Cloud data warehouses (Snowflake, BigQuery, Redshift)

Examples:

Customer Table:
| customer_id | name        | email               | sign_up_date |
|-------------|-------------|-------------------|--------------|
| 1           | Alice       | alice@example.com | 2024-01-15   |
| 2           | Bob         | bob@example.com   | 2024-01-20   |
Enter fullscreen mode Exit fullscreen mode

Advantages:

  • Easy to query and analyze
  • Strong consistency guarantees
  • Well-understood optimization techniques
  • Mature tooling ecosystem

Disadvantages:

  • Schema must be predefined
  • Difficult to handle unstructured content
  • Scaling writes can be challenging

2. Unstructured Data

Definition: Data without predefined schemas or organization, requiring specialized processing.

Characteristics:

  • No fixed format or schema
  • Requires specialized parsers and processors
  • Often large in volume
  • Stored in data lakes or object storage
  • Requires metadata for context

Types of Unstructured Data:

  • Text: Documents, emails, logs, PDFs
  • Media: Images, audio, video
  • Other: Binary files, archives, proprietary formats

Storage Systems:

  • Amazon S3 / S3-compatible storage (MinIO)
  • Google Cloud Storage (GCS)
  • Azure Blob Storage
  • Hadoop HDFS
  • Data lakes (Delta Lake, Apache Iceberg)

Processing Challenges:

  • Extracting meaningful information requires ML/NLP techniques
  • Storage costs can be significant
  • Retrieval and processing are slower than structured queries
  • Duplication and quality issues are common

Example Processing Pipeline:

Raw PDFs → PDF Parser → Text Extraction → NLP → Structured Data
                      ↓
            Feature Extraction
                      ↓
            ML Feature Store
Enter fullscreen mode Exit fullscreen mode

3. Semi-Structured Data

Definition: Self-describing data with some organizational structure but flexible schemas.

Characteristics:

  • Uses tags, keys, or hierarchies to describe content
  • Schema can evolve without migration
  • Queryable but with more flexibility than relational data
  • Self-documenting structure

File Formats:

  • JSON (JavaScript Object Notation)
  {
    "user_id": 123,
    "name": "Alice",
    "preferences": {
      "theme": "dark",
      "notifications": true
    },
    "tags": ["vip", "early_adopter"]
  }
Enter fullscreen mode Exit fullscreen mode
  • XML (eXtensible Markup Language)
  <user>
    <id>123</id>
    <name>Alice</name>
    <preferences>
      <theme>dark</theme>
    </preferences>
  </user>
Enter fullscreen mode Exit fullscreen mode
  • YAML (YAML Ain't Markup Language)
  user_id: 123
  name: Alice
  preferences:
    theme: dark
    notifications: true
Enter fullscreen mode Exit fullscreen mode
  • CSV (Comma-Separated Values)
  user_id,name,email,signup_date
  1,Alice,alice@example.com,2024-01-15
  2,Bob,bob@example.com,2024-01-20
Enter fullscreen mode Exit fullscreen mode

Storage Systems:

  • MongoDB (document database)
  • CouchDB
  • Elasticsearch (search and analytics)
  • DynamoDB (NoSQL)
  • Apache HBase
  • Cassandra

Processing Advantages:

  • Schema flexibility allows rapid iteration
  • Self-describing makes parsing easier
  • Suitable for logs and event data
  • Supports nested structures naturally

Example: API responses are often semi-structured JSON

{
  "status": "success",
  "data": {
    "orders": [
      {"id": 1, "amount": 100, "status": "shipped"},
      {"id": 2, "amount": 200, "status": "pending"}
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Data Sources and Collection Methods

Comprehensive Data Source Matrix

Data Source Category Specific Source Data Type Collection Method Collection Tools Latency Volume
Web & App Events Website clicks Semi-structured (JSON) Event tracking SDKs Segment, Amplitude, mixpanel Real-time High
Page visits Semi-structured Webhooks, logs Google Analytics, Mixpanel Real-time High
Form submissions Structured API POST requests Custom scripts, Zapier Real-time Medium
Purchases Structured Payment gateway webhooks Stripe, PayPal webhooks Real-time Medium
Mobile Applications App opens Semi-structured Mobile SDKs Firebase, Mixpanel Real-time High
Screen views Semi-structured Event streaming Firebase, Segment Real-time High
In-app purchases Structured App SDKs, APIs App Store Connect, Google Play Real-time Medium
User sessions Semi-structured Client-side tracking Firebase Analytics Real-time High
IoT & Sensors Temperature readings Structured MQTT, HTTP AWS IoT Core, Mosquitto Real-time Very High
GPS coordinates Structured MQTT, CoAP Edge gateways, Raspberry Pi Real-time Very High
Device metrics Structured Cloud ingestion Azure IoT Hub, Confluent Cloud Real-time Very High
Databases Transaction logs Structured CDC (Change Data Capture) Debezium, Qreplication Near real-time High
Customer records Structured Database replication Sqoop, Kafka Connect Batch/Real-time Medium
Product inventory Structured SQL queries, APIs Custom scripts, Airbyte Batch/Real-time Medium
Application Logs Server logs Unstructured Log forwarding agents Fluentd, Logstash, Filebeat Real-time Very High
Application logs Unstructured Logging libraries ELK Stack, Splunk Real-time Very High
Access logs Semi-structured Log aggregation Papertrail, Loggly Real-time Very High
Event Streams Real-time clicks Semi-structured Message brokers Apache Kafka, Pulsar Real-time Very High
Transactions Structured Event streams Kafka, Kinesis Real-time High
Notifications Semi-structured Message queues RabbitMQ, SQS Real-time Medium
Third-Party APIs Weather data Semi-structured HTTP polling OpenWeather, custom scripts Batch Low
Stock prices Structured REST/WebSocket APIs Alpha Vantage, Finnhub Real-time Medium
Social media Semi-structured API calls Twitter API, Facebook SDK Batch/Real-time High
SaaS Platforms Salesforce records Structured SaaS connectors Fivetran, Stitch Batch Medium
HubSpot contacts Structured API, connectors Airbyte, Zapier Batch Medium
Stripe payments Structured Webhooks, APIs Stripe CLI, custom scripts Real-time Medium

Real-World Data Collection Scenarios

Scenario 1: E-Commerce Website Click Tracking

Problem: Track user behavior (clicks, page visits, conversions) for analytics and personalization.

Architecture:

User Browser (Website)
        ↓
   JavaScript SDK
   (Segment/Analytics.js)
        ↓
   Event Queue
   (in-memory buffer)
        ↓
   HTTP POST (batched)
        ↓
Segment API / Custom Endpoint
        ↓
   Message Broker
   (Apache Kafka)
        ↓
   ┌─────────────────┬──────────────┐
   ↓                 ↓              ↓
Real-time       Stream Processor  Data Lake
Dashboard       (Spark Streaming)  (S3)
(low latency)   (aggregations)
Enter fullscreen mode Exit fullscreen mode

Technical Details:

  1. Event Capture (Client-Side)

    • SDK intercepts user actions
    • Batches events (100 events or 5 seconds)
    • Sends HTTP POST with gzip compression
    • Retries with exponential backoff on failure
  2. Event Schema (JSON)

   {
     "event_id": "uuid",
     "user_id": "12345",
     "session_id": "sess_abc123",
     "event_type": "click",
     "timestamp": "2024-01-15T10:30:45Z",
     "properties": {
       "button_text": "Add to Cart",
       "product_id": "prod_999",
       "price": 29.99
     },
     "context": {
       "ip": "192.168.1.1",
       "user_agent": "Mozilla/5.0...",
       "locale": "en_US"
     }
   }
Enter fullscreen mode Exit fullscreen mode
  1. Processing Pipeline

    • Kafka topic: web_events (partitioned by user_id)
    • Spark Streaming job aggregates per 1-minute windows
    • Output: metrics, materialized views for dashboards
  2. Challenges

    • Duplicate deduplication: event_id ensures idempotent processing
    • Out-of-order events: window-based aggregation tolerates some skew
    • Late arrivals: grace period allows 15-minute lateness
    • User privacy: PII (email, phone) never sent; use user_id instead

Tools Used:

  • Client-side: Segment JavaScript SDK
  • Transport: HTTPS with gzip
  • Queue: Apache Kafka (3 brokers, 3 replicas)
  • Processing: Apache Spark (PySpark)
  • Storage: Amazon S3 + Delta Lake
  • Serving: Looker dashboards

Scenario 2: IoT Sensor Data from Industrial Plant

Problem: Ingest temperature and pressure readings from 10,000 sensors across a factory every second.

Architecture:

Sensors (10k devices)
        ↓
   MQTT Protocol
   (publish/subscribe)
        ↓
MQTT Broker Cluster
(Mosquitto or HiveMQ)
        ↓
     Topic Tree:
   factory/line1/temp
   factory/line1/pressure
   factory/line2/temp
        ↓
Edge Gateway (stream processor)
        ↓
Message Queue (Kafka)
        ↓
  ┌──────────────┬─────────────────┬──────────┐
  ↓              ↓                 ↓          ↓
Real-time    Time-series    Cold Storage Anomaly
Alerts       Database       (Parquet)    Detection
(millisec)   (InfluxDB)     (S3 Archive)
Enter fullscreen mode Exit fullscreen mode

Technical Details:

  1. MQTT Architecture

    • Protocol: MQTT 3.1.1 / 5.0
    • QoS levels: QoS 1 (at least once) for critical sensors
    • Retained messages: last reading persisted on broker
    • Topic hierarchy enables fanout subscriptions
  2. Sensor Message Format

   {
     "device_id": "sensor_001_line1",
     "device_type": "temperature",
     "timestamp": 1705322445000,
     "value": 72.5,
     "unit": "celsius",
     "location": {
       "facility": "factory_a",
       "line": 1,
       "zone": "assembly"
     },
     "quality": "good",
     "battery_voltage": 4.8
   }
Enter fullscreen mode Exit fullscreen mode
  1. Stream Processing (Kafka + Flink)
   # Pseudo-code for Flink job
   input_stream = env.add_source(kafka_source)

   # Tumbling window: aggregate over 10 seconds
   windowed = (input_stream
     .key_by("device_id")
     .window(TumblingEventTimeWindows.of(Time.seconds(10)))
     .aggregate(compute_stats))  # min, max, avg, p95

   # Alert if temperature > 90°C
   alerts = windowed.filter(lambda x: x['temp_max'] > 90)
   alerts.add_sink(alert_sink)
Enter fullscreen mode Exit fullscreen mode
  1. Time-Series Storage

    • Database: InfluxDB (or TimescaleDB)
    • Retention: 7 days hot, then compress
    • Compression: 90% reduction via delta-of-delta encoding
    • Indexing: tag-based (device_id, facility, line)
  2. Challenges

    • Network failures: QoS 1 + local buffering ensures no data loss
    • Clock skew: sensors without NTP; use broker timestamp
    • Hot partitions: if one line sends 10x more data, partition by (facility, line)
    • Storage explosion: 10k sensors × 1 reading/sec = 860M readings/day = ~10TB/year (compressed)

Tools Used:

  • Devices: Arduino, Raspberry Pi with MQTT libraries
  • Broker: HiveMQ cluster (3 nodes, HA setup)
  • Edge Gateway: Custom Flink job
  • Queue: Apache Kafka (6 brokers, replication factor 3)
  • Time-series DB: InfluxDB
  • Alerting: PagerDuty via webhook
  • Archive: AWS Glacier

Scenario 3: Database Change Capture (CDC)

Problem: Keep a data warehouse in sync with production PostgreSQL database changes in real-time.

Architecture:

Production PostgreSQL
(OLTP)
        ↓
  Transaction Logs
  (WAL - Write-Ahead Log)
        ↓
 Debezium Connector
 (PostgreSQL reader)
        ↓
  Kafka Topic
  (change events)
        ↓
  ┌─────────────────┐
  ↓                 ↓
Stream Processor   Data Warehouse
(JDBC Sink)        (Snowflake)
  ↓                 ↓
Aggregations    Materialized Views
(Flink SQL)
Enter fullscreen mode Exit fullscreen mode

Technical Details:

  1. Debezium Setup
   connector.class: io.debezium.connector.postgresql.PostgresConnector
   plugin.name: pgoutput  # Logical decoding plugin
   slot.name: debezium_slot
   publication.name: dbz_publication
   include.tables: public.customers,public.orders
   transforms: unwrap
   transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
Enter fullscreen mode Exit fullscreen mode
  1. Change Event Format (JSON)
   {
     "before": null,
     "after": {
       "customer_id": 123,
       "name": "Alice Johnson",
       "email": "alice@example.com",
       "created_at": 1705322445000
     },
     "source": {
       "version": "2.5.0",
       "connector": "postgresql",
       "name": "mydb",
       "ts_ms": 1705322445123,
       "txId": 500,
       "lsn": 23456789,
       "xmin": null,
       "table": "customers"
     },
     "op": "c",
     "ts_ms": 1705322445500
   }
Enter fullscreen mode Exit fullscreen mode
  • op: c (create), u (update), d (delete), r (read)
  1. Kafka Topic Organization

    • Topic: cdc.production.customers
    • Partitioning: by customer_id (ensures order per customer)
    • Retention: 7 days (replay capability)
  2. Warehouse Sync (Snowflake)

   -- Merge statement executed per batch
   MERGE INTO staging.customers t
   USING (SELECT * FROM cdc_batch) s
   ON t.customer_id = s.customer_id
   WHEN MATCHED AND s.op = 'd' THEN DELETE
   WHEN MATCHED AND s.op IN ('c', 'u') THEN UPDATE
     SET name = s.name, email = s.email, updated_at = s.ts_ms
   WHEN NOT MATCHED AND s.op IN ('c', 'u') THEN INSERT
     VALUES (s.customer_id, s.name, s.email, s.ts_ms);
Enter fullscreen mode Exit fullscreen mode
  1. Advantages of CDC

    • Zero impact: read-only against production WAL
    • Completeness: captures all changes (no missed updates)
    • Ordering: preserves transactional order
    • Efficiency: incremental only (no full table scans)
  2. Challenges

    • Initial sync: snapshot mode reads table first (locks), then transitions to incremental
    • Large tables: snapshot can take hours; consider parallel readers
    • Transactions: large batch deletes create many events
    • Schema changes: Debezium propagates ALTER TABLE events as special records

Tools Used:

  • CDC Engine: Debezium 2.x
  • Message Queue: Apache Kafka
  • Sink: Snowflake JDBC connector
  • Orchestration: Kubernetes (Strimzi operator)

Categories of Data Collection Tools

A. Streaming/Real-Time Platforms

Used for continuous, low-latency data ingestion:

Tool Protocol Throughput Latency State Ordering
Apache Kafka Custom binary 100k+ msgs/sec ~10ms Stateful (Log) Per-partition
Apache Pulsar Custom binary 100k+ msgs/sec ~10ms Tiered storage Per-partition
AWS Kinesis HTTP/WebSocket 1k+ records/sec per shard 100-200ms Managed state Per-shard
Google Pub/Sub gRPC 100k+ msgs/sec 100-500ms Managed state No guarantee
RabbitMQ AMQP 50k msgs/sec <10ms In-memory/disk Per-queue

Kafka Example: High-volume clickstream

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',  # Wait for all replicas
    compression_type='snappy'
)

# Publish events
for event in user_events:
    producer.send(
        'web_events',
        value=event,
        key=str(event['user_id']).encode('utf-8'),  # Partition key
        timestamp_ms=int(event['timestamp'])
    )
Enter fullscreen mode Exit fullscreen mode

B. ETL / ELT Tools

Move and transform data between systems:

Tool Type Scheduling Transformation Cloud Support
Airbyte ELT Connectors dbt Yes (managed)
Fivetran ELT Connectors dbt Yes (managed)
Talend ETL Workflows Custom jobs Hybrid
Informatica ETL Designer GUI-based Hybrid
Apache Airflow Orchestration DAGs dbt, custom code Kubernetes-native
Prefect Orchestration DAGs Python code Cloud-native

Airbyte Example: Sync Salesforce to Snowflake

# connection.yaml
source_type: salesforce
source_config:
  client_id: ${SALESFORCE_CLIENT_ID}
  client_secret: ${SALESFORCE_CLIENT_SECRET}
  refresh_token: ${SALESFORCE_REFRESH_TOKEN}

destination_type: snowflake
destination_config:
  host: mycompany.us-east-1.snowflakecomputing.com
  database: analytics_db
  schema: salesforce_sync
  warehouse: compute_wh

sync_schedule: every_6_hours
Enter fullscreen mode Exit fullscreen mode

C. Log Collection Tools

Specialized for logs, metrics, and traces:

Tool Input Protocol Processing Output
Fluentd syslog, HTTP, file tailing Filtering, parsing, buffering Kafka, S3, MongoDB
Logstash syslog, HTTP, file Rich parsing via Grok Elasticsearch, S3
Filebeat Log files, stdin Lightweight, minimal parsing Logstash, Kafka, Elasticsearch
Vector 200+ sources High throughput, low memory 60+ destinations

Fluentd Example: Parse application logs

# fluent.conf
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/app.pos
  tag app.log
  <parse>
    @type json
  </parse>
</source>

<match app.log>
  @type kafka
  brokers kafka-1:9092,kafka-2:9092
  topic_key app_logs
  compression_codec snappy
</match>
Enter fullscreen mode Exit fullscreen mode

D. API Integration Tools

Pull data from SaaS platforms and APIs:

Tool Authentication Scheduling Error Handling
Postman OAuth, API keys, mTLS Manual + CLI Retries, custom scripts
Zapier OAuth flows Frequency-based Built-in retry logic
Make Connectors Real-time + scheduled Error handler modules

Custom API Ingestion (Python)

import requests
from datetime import datetime, timedelta
import json

def fetch_github_events(org: str, output_path: str):
    """Fetch GitHub org events via REST API."""
    page = 1
    all_events = []

    while True:
        url = f"https://api.github.com/orgs/{org}/events"
        params = {
            'page': page,
            'per_page': 100,
        }
        headers = {
            'Authorization': f'token {GITHUB_TOKEN}',
            'Accept': 'application/vnd.github+json'
        }

        response = requests.get(url, params=params, headers=headers, timeout=10)
        response.raise_for_status()

        events = response.json()
        if not events:
            break

        all_events.extend(events)
        page += 1

        # Respect rate limits
        remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
        if remaining < 10:
            reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
            wait_seconds = reset_time - datetime.now().timestamp()
            if wait_seconds > 0:
                print(f"Rate limited; waiting {wait_seconds}s")
                time.sleep(wait_seconds + 1)

    # Store as JSONL (one record per line)
    with open(output_path, 'w') as f:
        for event in all_events:
            f.write(json.dumps(event) + '\n')

    print(f"Fetched {len(all_events)} events to {output_path}")

fetch_github_events('microsoft', 'github_events.jsonl')
Enter fullscreen mode Exit fullscreen mode

Key Principle: Matching Collection Methods to Data Sources

Different sources require fundamentally different collection approaches:

┌─────────────────────────────────────────────────────────────┐
│              Source Type → Collection Method                 │
├─────────────────────────────────────────────────────────────┤
│ Databases (OLTP)       → CDC (Debezium, Qreplication)       │
│ APIs (REST, GraphQL)   → Connectors, polling, webhooks      │
│ Logs (files, syslog)   → Log shippers (Fluentd, Filebeat)   │
│ Real-time clicks       → Event streaming (Kafka, Pulsar)    │
│ Sensors (IoT)          → MQTT, CoAP, HTTP collectors        │
│ SaaS platforms         → Native connectors (Fivetran)       │
│ Message queues         → Direct subscription (Kafka)        │
│ Analytics services     → Bulk export APIs (S3)              │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The combination of source + collection method + transport + storage + processing forms a data pipeline—the foundation of modern data systems.


Data Ingestion Tools and Strategies

Push vs. Pull Architectures

Push Architecture

Source systems actively send data to collection endpoints.

Advantages:

  • Real-time or near-real-time delivery
  • Source system knows delivery status
  • Suitable for high-velocity data
  • Webhook-based (event-driven)

Disadvantages:

  • Requires sender to implement push logic
  • Destination must be always available
  • Backpressure harder to implement
  • Source system load impacts reliability

Example: Payment gateway webhook

@app.post("/webhooks/stripe")
async def handle_stripe_webhook(request: Request):
    """Stripe pushes payment events to our endpoint."""
    payload = await request.body()
    sig_header = request.headers.get("stripe-signature")

    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, webhook_secret
        )
    except ValueError:
        return {"error": "Invalid payload"}, 400

    # Push to message queue for processing
    kafka_producer.send('stripe_events', value=event)

    return {"status": "received"}, 200
Enter fullscreen mode Exit fullscreen mode

Pull Architecture

Destination systems proactively fetch data from sources.

Advantages:

  • Destination controls pace and timing
  • Easy backpressure and error handling
  • Retries simple (idempotent reads)
  • Source system passive

Disadvantages:

  • Latency: data not delivered until next pull
  • Polling overhead (many failed checks)
  • Hard to get all changes (need cursors/timestamps)
  • Source must support random access

Example: Polling GitHub API for new commits

import hashlib
import json
from datetime import datetime, timedelta

class GitHubPoller:
    def __init__(self, repo: str, token: str):
        self.repo = repo
        self.token = token
        self.session = requests.Session()
        self.session.headers['Authorization'] = f'token {token}'
        self.last_commit_sha = None

    def get_new_commits(self) -> list[dict]:
        """Pull commits since last poll."""
        # Use 'since' parameter to filter recent commits
        since = (datetime.utcnow() - timedelta(minutes=5)).isoformat()

        url = f"https://api.github.com/repos/{self.repo}/commits"
        params = {
            'since': since,
            'per_page': 100,
        }

        response = self.session.get(url, params=params)
        commits = response.json()

        # Update checkpoint
        if commits:
            self.last_commit_sha = commits[0]['sha']

        return commits

    def poll_loop(self, interval_seconds: int = 300):
        """Poll every N seconds."""
        while True:
            commits = self.get_new_commits()
            for commit in commits:
                # Process or queue commit
                print(f"Found commit: {commit['sha']}")
            time.sleep(interval_seconds)
Enter fullscreen mode Exit fullscreen mode

Batch vs. Streaming Ingestion

Batch Ingestion

Collect data in fixed intervals and load in bulk.

Use Cases:

  • Historical data imports
  • End-of-day reports
  • Large dataset migrations
  • Cost-sensitive (off-peak loading)

Example: Daily database export

#!/bin/bash
# Export PostgreSQL nightly
pg_dump --format=custom \
        --compress=9 \
        --jobs=4 \
        postgresql://user:pass@prod-db.local/mydb \
        > /data/exports/mydb_$(date +%Y%m%d).dump

# Upload to S3
aws s3 cp /data/exports/mydb_$(date +%Y%m%d).dump \
          s3://my-data-lake/databases/postgresql/daily/
Enter fullscreen mode Exit fullscreen mode

Advantages:

  • Simpler architecture (cron + SQL)
  • Cost-effective (offload during low-traffic hours)
  • Easier error recovery (entire batch succeeds/fails)
  • Well-suited for large migrations

Disadvantages:

  • Latency: data 1+ days old
  • Inefficient for small deltas
  • Storage spikes on load days
  • Harder to detect issues quickly

Streaming Ingestion

Continuously ingest small data units as they arrive.

Use Cases:

  • Real-time dashboards
  • Live fraud detection
  • Personalization engines
  • Time-sensitive analytics

Example: Kafka producer for application events

from kafka import KafkaProducer
from datetime import datetime
import json
import logging

class AppEventProducer:
    def __init__(self, brokers: list[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=brokers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            acks='all',  # Ensure durability
            compression_type='snappy',
            retries=3,
            max_in_flight_requests_per_connection=5,
        )
        self.logger = logging.getLogger(__name__)

    def send_event(self, event_type: str, **data):
        """Send event to Kafka asynchronously."""
        message = {
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data,
        }

        try:
            # Async send; callback fires on completion
            self.producer.send(
                f'app_events_{event_type}',
                value=message,
                key=str(data.get('user_id')).encode('utf-8'),
            ).add_callback(
                lambda meta: self.logger.info(
                    f"Event {event_type} sent to partition {meta.partition}"
                )
            ).add_errback(
                lambda exc: self.logger.error(f"Failed to send: {exc}")
            )
        except Exception as e:
            self.logger.error(f"Error sending event: {e}")

    def flush(self):
        """Block until all pending messages sent."""
        self.producer.flush(timeout_ms=30000)

# Usage
producer = AppEventProducer(['kafka-1:9092', 'kafka-2:9092'])
producer.send_event('user_signup', user_id=123, email='user@example.com')
producer.send_event('purchase', user_id=123, amount=99.99)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

Advantages:

  • Real-time availability
  • Incremental updates only
  • Easier to detect anomalies
  • Supports complex windowed aggregations

Disadvantages:

  • Complex infrastructure (brokers, coordinators)
  • Harder to replay/recover from errors
  • Requires handling out-of-order events
  • Higher operational overhead

The Data Lifecycle

Data follows a complete journey from creation to deletion. Understanding each phase is crucial for building resilient systems:

┌──────────────────────────────────────────────────────────────────┐
│                      Data Lifecycle Phases                         │
├──────────────────────────────────────────────────────────────────┤
│                                                                    │
│  1. Generation/Creation                                           │
│     ↓                                                             │
│  2. Collection/Ingestion                                          │
│     ↓                                                             │
│  3. Storage (Raw)                                                 │
│     ↓                                                             │
│  4. Processing/Transformation                                     │
│     ↓                                                             │
│  5. Orchestration & Workflow Management                           │
│     ↓                                                             │
│  6. Serving/Consumption                                           │
│     ↓                                                             │
│  7. Governance & Security                                         │
│     ↓                                                             │
│  8. Archival or Deletion                                          │
│                                                                    │
└──────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Phase 1: Data Generation / Creation

Definition: Data originates from business processes and systems.

Sources of Generation:

  • User interactions (clicks, form inputs, searches)
  • System operations (logs, metrics, traces)
  • Business transactions (orders, payments, reservations)
  • External events (weather, market data)
  • Sensors and IoT devices

Key Considerations:

Data Quality at Source:

  • Validate data format at entry point
  • Enforce schema constraints
  • Assign unique identifiers (UUIDs preferred)
  • Include timestamps (in UTC, not local time)
  • Add metadata (source system, user context)

Example: E-commerce order generation

from dataclasses import dataclass
from datetime import datetime
from uuid import uuid4
from enum import Enum

class OrderStatus(str, Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    SHIPPED = "shipped"
    DELIVERED = "delivered"

@dataclass
class Order:
    order_id: str = None
    user_id: str = None
    items: list = None
    total_amount: float = 0.0
    status: OrderStatus = OrderStatus.PENDING
    created_at: str = None
    updated_at: str = None

    def __post_init__(self):
        if not self.order_id:
            self.order_id = str(uuid4())
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()
        self.updated_at = self.created_at

    def to_dict(self):
        return {
            'order_id': self.order_id,
            'user_id': self.user_id,
            'items': self.items,
            'total_amount': round(self.total_amount, 2),
            'status': self.status.value,
            'created_at': self.created_at,
            'updated_at': self.updated_at,
        }
Enter fullscreen mode Exit fullscreen mode

Phase 2: Data Collection / Ingestion

Definition: Data is gathered and moved into centralized storage or processing systems.

Ingestion Methods (Recap):

  1. Batch - Scheduled uploads
  2. Real-time streaming - Continuous flow
  3. API polling - On-demand requests
  4. Message queues - Event-driven push
  5. Database CDC - Transactional logs

Critical Design Patterns:

Exactly-Once Semantics:

  • Ensure no duplicates even if systems fail
  • Use idempotent operations (based on message IDs)
  • Implement deduplication in downstream systems
  • Example: Kafka + offset management

Backpressure Handling:

  • Don't overwhelm destination systems
  • Use circuit breakers for downstream failures
  • Implement queues with bounded capacity
  • Slow consumption automatically slows production
import asyncio
from typing import AsyncIterator

class BackpressureQueue:
    def __init__(self, max_size: int = 1000):
        self.queue = asyncio.Queue(maxsize=max_size)

    async def produce(self, item):
        """Block if queue full (backpressure)."""
        try:
            self.queue.put_nowait(item)
        except asyncio.QueueFull:
            print(f"Queue full; applying backpressure")
            await self.queue.put(item)  # This blocks

    async def consume(self) -> AsyncIterator:
        """Consume items with automatic flow control."""
        while True:
            item = await self.queue.get()
            yield item
            self.queue.task_done()
Enter fullscreen mode Exit fullscreen mode

Phase 3: Data Storage (Raw)

Definition: Raw, unprocessed data is persisted for future processing and compliance.

Storage Tier Strategy:

┌─────────────────────────────────────────────────────┐
│                  Storage Architecture                │
├─────────────────────────────────────────────────────┤
│                                                      │
│  Hot Storage (Fast Access)                          │
│  ├─ Local SSD / NVMe (sub-millisecond latency)     │
│  ├─ Cloud block storage (EBS, Persistent Disk)     │
│  └─ Cost: $0.10-0.20 / GB / month                  │
│                                                      │
│  Warm Storage (Moderate Access)                     │
│  ├─ Object storage (S3, GCS) w/ standard tier      │
│  ├─ Data lakes (Delta, Iceberg) on cloud storage   │
│  └─ Cost: $0.023 / GB / month                      │
│                                                      │
│  Cold Storage (Archive)                             │
│  ├─ S3 Glacier, Azure Archive                      │
│  ├─ On-premises tape                               │
│  └─ Cost: $0.004 / GB / month (retrieval in hours) │
│                                                      │
└─────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Format and Compression:

Format Compression Use Case Read Speed
Parquet Snappy (default) Analytical queries ✓ Columnar access
ORC Zstd Hive/Spark, columnar ✓ Excellent compression
JSONL None/gzip Logs, unstructured ✗ Slower (full scan)
Avro Deflate Streaming, schema evolution ~ Row-based
CSV gzip Legacy systems ✗ Text parsing overhead
Protobuf None Microservices, bandwidth ~ Compact binary

Example: Write partitioned Parquet dataset

import pandas as pd
from pyarrow import parquet as pq
import pyarrow as pa
from datetime import datetime

def write_data_lake(events: list[dict], output_path: str):
    """Write events to S3 partitioned by date and event type."""

    # Convert to DataFrame
    df = pd.DataFrame(events)
    df['event_date'] = pd.to_datetime(df['timestamp']).dt.date
    df['event_type'] = df['event_type'].astype('category')

    # Write partitioned dataset
    table = pa.Table.from_pandas(df)

    pq.write_table(
        table,
        output_path,
        partition_cols=['event_date', 'event_type'],
        compression='snappy',
        use_legacy_format=False,
        coerce_timestamps='ms',
    )

    # Result:
    # s3://data-lake/events/
    #   event_date=2024-01-15/
    #     event_type=click/
    #       part-0.parquet
    #     event_type=purchase/
    #       part-0.parquet
    #   event_date=2024-01-16/
    #     ...

write_data_lake(
    [
        {'timestamp': '2024-01-15T10:30:00Z', 'event_type': 'click', 'user_id': 1},
        {'timestamp': '2024-01-15T11:00:00Z', 'event_type': 'purchase', 'user_id': 2},
    ],
    's3://my-data-lake/events/'
)
Enter fullscreen mode Exit fullscreen mode

Phase 4: Data Processing / Transformation

Definition: Raw data is cleaned, transformed, validated, and enriched to make it useful.

Common Transformations:

Transformation Purpose Example
Deduplication Remove duplicate records df.drop_duplicates(subset=['event_id'])
Filtering Remove irrelevant/bad data df[df['amount'] > 0]
Type Casting Normalize data types Convert strings to datetime
Standardization Consistent formatting Email lowercase, phone E.164
Enrichment Add context/reference data Join user geolocation
Aggregation Summarize data Sum sales per day, avg price
Pivoting Reshape for analytics Dimensions vs. metrics
Nullability Handle missing values Fill NULLs, drop empty rows

Processing Styles:

Batch Processing

Process large volumes at scheduled intervals.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_utc_timestamp, dayofweek, hour

spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()

# Read raw data from S3
raw_events = spark.read.parquet("s3://data-lake/raw/events/event_date=2024-01-15/")

# Clean and transform
cleaned = raw_events \
    .filter(col("event_type").isin(["click", "purchase", "view"])) \
    .filter(col("amount") >= 0) \
    .fillna({"amount": 0}) \
    .withColumn("event_date", from_utc_timestamp(col("timestamp"), "US/Pacific")) \
    .withColumn("day_of_week", dayofweek("event_date")) \
    .withColumn("hour", hour("event_date"))

# Aggregations
daily_summary = cleaned \
    .groupby("event_date", "event_type") \
    .agg({
        "amount": "sum",
        "user_id": "count",
    }) \
    .rename({"sum(amount)": "total_amount", "count(user_id)": "event_count"})

# Write to warehouse
daily_summary.write \
    .mode("overwrite") \
    .parquet("s3://data-warehouse/processed/daily_events/event_date=2024-01-15/")
Enter fullscreen mode Exit fullscreen mode

Stream Processing

Process data as it arrives, maintaining state.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum as spark_sum

spark = SparkSession.builder \
    .appName("StreamingAnalysis") \
    .getOrCreate()

# Read from Kafka
events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user_events") \
    .option("startingOffsets", "latest") \
    .load()

# Parse and transform
parsed = events.select(
    col("value").cast("string"),
    col("timestamp")
).withColumn("json", from_json(col("value"), event_schema))

# 5-minute tumbling window aggregation
windowed_agg = parsed \
    .groupby(
        window(col("timestamp"), "5 minutes"),
        col("json.event_type")
    ) \
    .agg(spark_sum("json.amount").alias("total_amount"))

# Write to sink (e.g., Kafka, database)
query = windowed_agg \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "event_aggregates") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start()

query.awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Phase 5: Data Orchestration & Workflow Management

Definition: Pipelines are scheduled, monitored, and automated to run reliably.

Orchestration Concepts:

DAG (Directed Acyclic Graph):
Each pipeline is a graph of tasks with dependencies.

┌─────────────────────────────────────────┐
│         Daily ETL Pipeline              │
├─────────────────────────────────────────┤
│                                         │
│        ┌─────────┐                      │
│        │ Extract │ (Fetch raw data)     │
│        └────┬────┘                      │
│             │                           │
│        ┌────▼─────┐                     │
│        │ Transform │ (Clean & enrich)   │
│        └────┬──┬──┘                     │
│             │  └─────────────┐          │
│        ┌────▼──┐       ┌─────▼──┐      │
│        │ Load  │       │ Validate│     │
│        │Warehouse       │Quality │      │
│        └────┬──┘       └─────┬──┘      │
│             │                │         │
│        ┌────▼────────────────▼─┐       │
│        │  Run QA Checks        │       │
│        │  (Row counts, nulls)  │       │
│        └────┬───────────────────┘      │
│             │                          │
│        ┌────▼──────────────┐           │
│        │  Alert (success   │           │
│        │   or failure)     │           │
│        └───────────────────┘           │
│                                        │
└─────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Example: Apache Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-eng',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': days_ago(1),
}

dag = DAG(
    'daily_data_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule_interval='0 2 * * *',  # 2 AM daily
    catchup=False,
)

def extract_data(**context):
    """Extract raw data from source."""
    print("Extracting data...")
    # Query source system, write to /tmp/raw_data.parquet

def transform_data(**context):
    """Transform and clean data."""
    print("Transforming data...")
    # Read raw, apply transformations, write to /tmp/transformed_data.parquet

def load_data(**context):
    """Load into warehouse."""
    print("Loading to warehouse...")
    # Insert into Snowflake

def run_quality_checks(**context):
    """Validate data quality."""
    print("Running quality checks...")
    # Count rows, check nulls, etc.

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag,
)

quality_task = PythonOperator(
    task_id='quality',
    python_callable=run_quality_checks,
    dag=dag,
)

alert_task = BashOperator(
    task_id='alert',
    bash_command="""
        if [ {{ task_instance.state }} == 'success' ]; then
            curl -X POST https://slack.com/api/chat.postMessage \
                -d "text=Pipeline succeeded"
        fi
    """,
    dag=dag,
)

# Define dependencies
extract_task >> transform_task >> load_task >> quality_task >> alert_task
Enter fullscreen mode Exit fullscreen mode

Orchestration Tools Comparison:

Tool Paradigm Language Scalability UI
Apache Airflow DAG-based Python Kubernetes Rich web UI
Prefect Flow-based Python Cloud-native Modern UI
Dagster Asset-oriented Python Kubernetes Opinionated UI
dbt SQL transformation YAML/Jinja2 Lightweight Web dashboard
Nextflow Scientific workflow Groovy/Java HPC Minimal UI
Apache Beam Unified API Python/Java Distributed N/A (library)

Phase 6: Data Serving / Consumption

Definition: Processed data is made available to end users and systems.

Serving Layers:

A. Data Warehouses (OLAP)

Centralized repository for analytical queries.

Characteristics:

  • Columnar storage (compressed)
  • SQL queryable
  • MPP (Massively Parallel Processing)
  • Historical data (5-10 years typical)
  • Query latency: 1-100 seconds

Leading Solutions:

  • Snowflake: Cloud-native, instant elasticity
  • Google BigQuery: Serverless, integrated with Google Cloud ecosystem
  • Amazon Redshift: On-premises or cloud, Postgres-compatible
  • Databricks (Lakehouse): Combines data lake + warehouse

Example: BigQuery query

-- Analyze sales trends
SELECT
    DATE(created_at) as sale_date,
    product_category,
    SUM(amount) as daily_revenue,
    COUNT(DISTINCT customer_id) as unique_customers,
    AVG(amount) as avg_transaction_value,
FROM `project.dataset.orders`
WHERE DATE(created_at) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) AND CURRENT_DATE()
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC
Enter fullscreen mode Exit fullscreen mode

B. Data Marts

Department-specific subsets with distinct definitions.

Finance Data Mart:

CREATE TABLE finance.daily_revenue AS
SELECT
    DATE(transaction_date) as revenue_date,
    SUM(amount) as total_revenue,
    COUNT(*) as transaction_count,
    SUM(amount) / COUNT(*) as avg_transaction,
    SUM(CASE WHEN refunded THEN amount ELSE 0 END) as refunded_amount,
FROM warehouse.transactions
WHERE transaction_type = 'SALE'
GROUP BY 1;
Enter fullscreen mode Exit fullscreen mode

Marketing Data Mart:

CREATE TABLE marketing.daily_funnel AS
SELECT
    DATE(created_at) as funnel_date,
    campaign_id,
    COUNT(DISTINCT CASE WHEN event_type = 'visit' THEN user_id END) as visits,
    COUNT(DISTINCT CASE WHEN event_type = 'click' THEN user_id END) as clicks,
    COUNT(DISTINCT CASE WHEN event_type = 'signup' THEN user_id END) as signups,
    ROUND(100.0 * COUNT(DISTINCT CASE WHEN event_type = 'signup' THEN user_id END)
          / NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'visit' THEN user_id END), 0), 2) as conversion_rate,
FROM warehouse.marketing_events
GROUP BY 1, 2;
Enter fullscreen mode Exit fullscreen mode

C. Feature Stores

Engineered features for machine learning models.

Purpose:

  • Consistent features between training and serving
  • Real-time feature computation
  • Feature versioning and lineage
  • Reduced model training time

Tools:

  • Feast (open-source, built on Spark)
  • Tecton (enterprise, real-time + batch)
  • Hopsworks (open-source + managed)

Example: Feature store definition (Feast)

from feast import Feature, FeatureView, Entity, FileSource
from feast.types import Float32, Int32
from datetime import timedelta

# Define entities
user = Entity(name="user_id", value_type=ValueType.INT64)

# Define feature sources
transactions = FileSource(
    path="s3://my-data/transactions.parquet",
    event_timestamp_column="timestamp",
)

# Define features
user_features = FeatureView(
    name="user_transaction_features",
    entities=[user],
    ttl=timedelta(days=30),
    features=[
        Feature(name="total_spent", dtype=Float32),
        Feature(name="transaction_count", dtype=Int32),
        Feature(name="avg_transaction_value", dtype=Float32),
        Feature(name="days_since_last_purchase", dtype=Int32),
    ],
    online=True,  # Serve in real-time
    source=transactions,
    tags={"version": "v1", "owner": "ml-platform"},
)

# At inference time:
# features = store.get_online_features(
#     features=["user_features:total_spent", "user_features:transaction_count"],
#     entity_rows=[{"user_id": 123}],
# )
Enter fullscreen mode Exit fullscreen mode

D. BI / Dashboard Tools

Visualization and reporting for non-technical users.

Leading Solutions:

  • Tableau: Rich visualizations, enterprise focus
  • Power BI: Microsoft ecosystem integration
  • Looker: Semantic layer, LookML language
  • Superset: Open-source, self-hosted

Example: Looker dashboard definition (LookML)

dashboard: sales_performance {
  title: "Sales Performance Dashboard"

  tile: daily_revenue {
    query: daily_revenue_query
    title: "Daily Revenue"
    visualization: {
      type: looker_column
    }
  }

  tile: customer_acquisition {
    query: customer_acq_query
    title: "New Customers"
    visualization: {
      type: looker_line
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

E. Operational Serving

Real-time data for applications and user-facing systems.

Latency Requirements:

  • < 100ms for recommendation engines
  • < 500ms for search results
  • < 1s for personalization

Storage Systems:

  • Redis (in-memory cache, sub-millisecond)
  • Memcached (distributed cache)
  • Elasticsearch (search index, ~10ms queries)
  • DynamoDB (NoSQL, single-digit ms)

Example: Redis feature cache for recommendations

import redis
import json
from datetime import datetime, timedelta

redis_client = redis.Redis(host='redis-cluster', port=6379, db=0)

def get_user_recommendations(user_id: int, refresh_if_stale: bool = True) -> list:
    """Get recommendations with cache fallback."""
    cache_key = f"recommendations:{user_id}"

    # Try cache first
    cached = redis_client.get(cache_key)
    if cached:
        data = json.loads(cached)
        if not refresh_if_stale or is_fresh(data['cached_at']):
            return data['recommendations']

    # Compute fresh recommendations
    recommendations = compute_recommendations(user_id)

    # Cache with 1-hour TTL
    redis_client.setex(
        cache_key,
        timedelta(hours=1),
        json.dumps({
            'recommendations': recommendations,
            'cached_at': datetime.utcnow().isoformat(),
        })
    )

    return recommendations

def is_fresh(cached_at_str: str, max_age_hours: int = 1) -> bool:
    cached_at = datetime.fromisoformat(cached_at_str)
    age = (datetime.utcnow() - cached_at).total_seconds() / 3600
    return age < max_age_hours
Enter fullscreen mode Exit fullscreen mode

Phase 7: Data Governance & Security

Definition: Ensure data is secure, accurate, compliant, and auditable.

Core Governance Pillars:

A. Data Lineage

Track data flow and transformations.

┌────────────────────────────────────────────┐
│        Data Lineage Example                │
├────────────────────────────────────────────┤
│                                            │
│ PostgreSQL (production)                    │
│ ├─ table: customers                        │
│ ├─ table: orders                           │
│ │                                          │
│ Kafka (streaming)                          │
│ ├─ topic: cdc.customers                    │
│ ├─ topic: cdc.orders                       │
│ │                                          │
│ Spark Job (transform)                      │
│ ├─ Input: Kafka + S3 lookup                │
│ ├─ Logic: Enrich, deduplicate              │
│ ├─ Output: S3 parquet                      │
│ │                                          │
│ Snowflake (warehouse)                      │
│ ├─ table: raw.orders                       │
│ ├─ table: analytics.daily_revenue          │
│ │   (derived from raw.orders)              │
│ │                                          │
│ Tableau (BI)                               │
│ └─ Dashboard: Sales Performance            │
│    └─ Uses: analytics.daily_revenue        │
│                                            │
└────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Tools:

  • Apache Atlas (open-source, Hadoop ecosystem)
  • Collibra (enterprise, workflow-based)
  • Datafold (automated, CI/CD-integrated)

B. Access Control

Restrict who can see what data.

Role-Based Access Control (RBAC):

-- Snowflake example: Role-based grants
CREATE ROLE analyst_finance;
CREATE ROLE analyst_marketing;

GRANT USAGE ON DATABASE analytics TO ROLE analyst_finance;
GRANT SELECT ON SCHEMA analytics.finance TO ROLE analyst_finance;
GRANT SELECT ON TABLE analytics.finance.revenue TO ROLE analyst_finance;

-- Deny access to sensitive columns
GRANT SELECT ON TABLE analytics.orders (id, amount, date) TO ROLE analyst_marketing;
-- Columns like: customer_email, payment_method denied

-- Assign role to user
GRANT ROLE analyst_finance TO USER alice@company.com;
Enter fullscreen mode Exit fullscreen mode

Attribute-Based Access Control (ABAC):

  • Dynamic column-level masking
  • Row filtering based on attributes
  • Example: Marketing analysts see only their region's data
-- Snowflake: Dynamic masking
ALTER TABLE analytics.customers
MODIFY COLUMN email 
    SET MASKING POLICY email_mask USING (CURRENT_ROLE());

CREATE MASKING POLICY email_mask AS
    (col_value varchar) returns varchar ->
    CASE
        WHEN CURRENT_ROLE() IN ('ANALYST', 'ADMIN') THEN col_value
        ELSE '***@***.***'
    END;
Enter fullscreen mode Exit fullscreen mode

C. Data Encryption

Protect data at rest and in transit.

Encryption Strategies:

Layer Method Key Management Use Case
In Transit TLS 1.3 Certificate authority Network traffic
At Rest AES-256 Cloud KMS or HSM Database storage
Column-level AES-256 Separate key per column PII fields
Database native Provider-specific AWS KMS, Azure Key Vault Cloud warehouses

Example: Encrypting sensitive columns in Python

from cryptography.fernet import Fernet
import base64
import hashlib

class ColumnEncryptor:
    def __init__(self, master_key: str):
        # Derive key from master_key (in production, use AWS KMS)
        key = base64.urlsafe_b64encode(
            hashlib.pbkdf2_hmac('sha256', master_key.encode(), b'salt', 100000)[:32]
        )
        self.cipher_suite = Fernet(key)

    def encrypt(self, plaintext: str) -> str:
        encrypted = self.cipher_suite.encrypt(plaintext.encode())
        return base64.b64encode(encrypted).decode()

    def decrypt(self, ciphertext: str) -> str:
        decrypted = self.cipher_suite.decrypt(
            base64.b64decode(ciphertext.encode())
        )
        return decrypted.decode()

# Usage
encryptor = ColumnEncryptor('my-master-key')
encrypted_email = encryptor.encrypt('user@example.com')
# Store encrypted_email in database
# Decrypt on demand for authorized users
Enter fullscreen mode Exit fullscreen mode

D. Data Quality Monitoring

Detect anomalies and quality issues.

Quality Dimensions:

  • Completeness: No unexpected nulls
  • Accuracy: Data matches reality
  • Consistency: Values align across systems
  • Timeliness: Data fresh enough
  • Uniqueness: No unintended duplicates

Example: dbt tests for data quality

# dbt_project.yml
models:
  - name: orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null

      - name: amount
        tests:
          - not_null
          - dbt_utils.not_empty_string
          - custom_test_positive_amount

      - name: customer_id
        tests:
          - relationships:
              to: ref('customers')
              field: customer_id

tests:
  - name: custom_test_positive_amount
    description: "Amount should be positive"
    sql: |
      SELECT *
      FROM {{ ref('orders') }}
      WHERE amount <= 0
      -- Should return 0 rows
Enter fullscreen mode Exit fullscreen mode

E. Auditing and Compliance

Track access, changes, and regulatory requirements.

Audit Log Example:

{
  "timestamp": "2024-01-15T10:30:00Z",
  "user": "alice@company.com",
  "action": "SELECT",
  "database": "analytics",
  "table": "customers",
  "columns": ["id", "name", "email"],
  "rows_returned": 1000,
  "query_id": "q_abc123",
  "client_ip": "192.168.1.100",
  "query": "SELECT id, name, email FROM customers WHERE region = 'US' LIMIT 1000"
}
Enter fullscreen mode Exit fullscreen mode

Compliance Standards:

  • GDPR (EU): Right to be forgotten, data portability
  • HIPAA (healthcare): Encryption, audit trails, access controls
  • SOC 2 (finance/SaaS): Security controls, monitoring
  • CCPA (California): Consumer privacy rights

Phase 8: Data Archival and Retention

Definition: Old or unused data is preserved cost-effectively or deleted per policy.

Archival Strategy:

Time Passes →

Current Data (hot)
├─ Location: Fast SSD/NVMe
├─ Cost: $0.20/GB/month
├─ Retention: Current + 30 days
└─ Use: Daily dashboards, real-time analytics

30-90 Days Old (warm)
├─ Location: Cloud standard storage (S3)
├─ Cost: $0.023/GB/month
├─ Retention: 90 days
└─ Use: Monthly reports, compliance checks

90+ Days Old (cold)
├─ Location: Glacier, Archive tier
├─ Cost: $0.004/GB/month
├─ Retrieval: 4-12 hours (acceptable for rare access)
└─ Use: Legal holds, annual audits

7+ Years Old (deleted)
├─ Reason: Regulatory requirement (GDPR 3-year min)
├─ Method: Secure deletion (NIST 800-88)
└─ Verification: Deletion logs, hash verification
Enter fullscreen mode Exit fullscreen mode

Lifecycle Policies (AWS S3 Example):

{
  "Rules": [
    {
      "Id": "archive-old-data",
      "Status": "Enabled",
      "Filter": {
        "Prefix": "analytics/"
      },
      "Transitions": [
        {
          "Days": 90,
          "StorageClass": "STANDARD_IA"
        },
        {
          "Days": 180,
          "StorageClass": "GLACIER"
        },
        {
          "Days": 365,
          "StorageClass": "DEEP_ARCHIVE"
        }
      ],
      "Expiration": {
        "Days": 2555
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Retention Policy Example:

Operational Data
├─ Logs: 90 days
├─ Transactions: 7 years (financial regulatory)
└─ User profiles: Until account deletion + 1 year

Personal Data (GDPR Subject)
├─ Active customer: Retained during relationship
├─ Inactive: 1 year after last activity
└─ Deletion request: 30 days maximum

Compliance Data
├─ Audit logs: 5 years
├─ Access records: 3 years
└─ Deletion certificates: Permanent
Enter fullscreen mode Exit fullscreen mode

Data Storage Systems

Relational Databases (OLTP)

Purpose: Transactional systems (production databases).

Characteristics:

  • ACID guarantees
  • Normalized schemas
  • Row-oriented
  • Write-optimized
  • Low latency (< 100ms)

Examples: PostgreSQL, MySQL, Oracle, SQL Server

Data Warehouses (OLAP)

Purpose: Analytical queries across historical data.

Characteristics:

  • Column-oriented storage
  • Denormalized schemas (star/snowflake)
  • Read-optimized
  • Distributed query execution
  • Higher latency (1-100s)

Key Innovations:

  • Columnar Storage: Compress 10-100x vs. row-based
  • Partitioning: Prune irrelevant partitions
  • Query Vectorization: Process batches of values in CPU cache
  • MPP: Parallelize queries across nodes

Snowflake Architecture:

┌──────────────────────────────────────┐
│    Snowflake Data Warehouse          │
├──────────────────────────────────────┤
│                                      │
│  Compute Layer (Massively Parallel) │
│  ├─ Virtual Warehouses (elastic)    │
│  ├─ Query optimization & execution  │
│  └─ Auto-scaling                    │
│                                      │
│  Storage Layer (Cloud-agnostic)     │
│  ├─ Parquet format (compressed)     │
│  ├─ Separation of compute & storage │
│  └─ Cross-region replication        │
│                                      │
│  Metadata Layer (Iceberg-like)      │
│  ├─ ACID transactions               │
│  ├─ Schema versioning               │
│  └─ Time travel                     │
│                                      │
└──────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Data Lakes

Purpose: Store all data (raw + processed) cost-effectively.

Characteristics:

  • Flexible schema
  • Supports structured + unstructured
  • Object storage-based (S3, GCS)
  • Cheaper than warehouses
  • Requires more processing

Modern Lakehouse (Delta Lake, Apache Iceberg):

  • ACID transactions on data lake
  • Schema enforcement
  • Time travel
  • DML operations (UPDATE, DELETE)

Example: Delta Lake

from delta.tables import DeltaTable
import pyspark.sql.functions as F

# Create Delta table
spark.read.parquet("s3://data-lake/raw/") \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .option("mergeSchema", "true") \
    .save("s3://data-lake/orders_delta/")

# ACID update
dt = DeltaTable.forPath(spark, "s3://data-lake/orders_delta/")
dt.update(
    condition=F.col("status") == "pending",
    set={"status": "processed", "updated_at": F.current_timestamp()}
)

# Time travel (go back 1 hour)
old_data = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2024-01-15 10:00:00") \
    .load("s3://data-lake/orders_delta/")

# View transaction history
spark.sql("""
    SELECT * FROM delta.`s3://data-lake/orders_delta/` 
    VERSION AS OF 5  -- Specific version
""")
Enter fullscreen mode Exit fullscreen mode

Data Processing and Transformation

Processing Paradigms

Batch Processing (Spark)

Process entire datasets at intervals.

Advantages:

  • Efficient for large volumes
  • Cost-effective (off-peak execution)
  • Deterministic, reproducible

Disadvantages:

  • Latency: 1+ hour delay
  • Inefficient for small changes
  • Harder to handle late arrivals

Example: Spark word count

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Read text files
text_df = spark.read.text("s3://bucket/documents/*.txt")

# Tokenize and count
word_counts = text_df \
    .select(F.explode(F.split(F.col("value"), " ")).alias("word")) \
    .filter(F.col("word") != "") \
    .groupby("word") \
    .count() \
    .sort(F.desc("count"))

# Write results
word_counts.write.mode("overwrite").parquet("s3://bucket/output/")
Enter fullscreen mode Exit fullscreen mode

Stream Processing (Kafka Streams, Flink)

Process data continuously as it arrives.

Advantages:

  • Real-time results (seconds)
  • Handles infinite streams
  • Stateful aggregations

Disadvantages:

  • Complex semantics (exactly-once, ordering)
  • Higher operational overhead
  • Debugging harder

Example: Kafka Streams topology

from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta

class WindowedAggregator:
    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self.windows = defaultdict(lambda: {"count": 0, "sum": 0})
        self.consumer = KafkaConsumer(
            'input_topic',
            bootstrap_servers=['kafka:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='aggregator_group',
        )
        self.producer = KafkaProducer(
            bootstrap_servers=['kafka:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        )

    def get_window_key(self, timestamp: int) -> int:
        """Assign message to window."""
        return (timestamp // self.window_seconds) * self.window_seconds

    def process(self):
        """Consume and aggregate messages."""
        for message in self.consumer:
            data = message.value
            window_key = self.get_window_key(data['timestamp'])

            # Update window
            self.windows[window_key]['count'] += 1
            self.windows[window_key]['sum'] += data['value']

            # Emit if window complete
            if data['timestamp'] > (window_key + self.window_seconds):
                window_data = self.windows.pop(window_key)
                self.producer.send('output_topic', {
                    'window_start': window_key,
                    'count': window_data['count'],
                    'avg': window_data['sum'] / window_data['count'],
                })

aggregator = WindowedAggregator()
aggregator.process()
Enter fullscreen mode Exit fullscreen mode

Data Serving and Consumption Layers (Detailed)

Why Different Teams Need Different Data

The same raw data serves different purposes when viewed through different lenses:

Data Analysts

What they need:

  • Aggregated metrics (sums, averages, percentiles)
  • Historical trends (day-over-day, year-over-year)
  • SQL-friendly schemas
  • Business context (names, labels, hierarchies)

Example query:

SELECT
    DATE_TRUNC('month', order_date) as month,
    SUM(order_amount) as monthly_revenue,
    COUNT(DISTINCT customer_id) as unique_customers,
    ROUND(SUM(order_amount) / COUNT(DISTINCT customer_id), 2) as avg_customer_value,
    (SUM(CASE WHEN is_returning_customer THEN order_amount ELSE 0 END) / 
     SUM(order_amount)) * 100 as pct_from_returning
FROM analytics.orders
WHERE DATE_TRUNC('month', order_date) >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY 1
ORDER BY 1 DESC;
Enter fullscreen mode Exit fullscreen mode

ML Engineers / Data Scientists

What they need:

  • Numerical features (ready for model input)
  • Consistent feature definitions across train/serve
  • Low-latency access (real-time inference)
  • Feature versioning

Example features:

# Customer churn prediction model features
features = {
    "user_id": 12345,
    "days_since_signup": 450,
    "total_purchases": 8,
    "total_spent": 2499.50,
    "avg_order_value": 312.44,
    "purchase_frequency_30d": 0.5,
    "days_since_last_purchase": 45,
    "churn_risk_score": 0.72,  # Derived feature
    "segment": "high_value",
}
Enter fullscreen mode Exit fullscreen mode

Product & Growth Teams

What they need:

  • Behavioral metrics (funnel steps, retention)
  • Experiment results (A/B test impacts)
  • Real-time dashboards (live user counts)
  • User segments

Example metrics:

# Growth funnel metrics (daily)
metrics = {
    "date": "2024-01-15",
    "sign_ups": 450,
    "activated_users": 180,  # Completed onboarding
    "activation_rate": 0.40,
    "day_7_retention": 0.65,
    "day_30_retention": 0.40,
    "paying_customers": 35,
    "ltv": 4500,  # Lifetime value
}
Enter fullscreen mode Exit fullscreen mode

Executives / Leadership

What they need:

  • High-level KPIs (revenue, growth, margins)
  • Trends and alerts
  • Comparisons (vs. last year, vs. plan)
  • Simple visualizations

Example dashboard:

┌──────────────────────────────────────┐
│    Executive Sales Dashboard         │
├──────────────────────────────────────┤
│                                      │
│  Revenue (MTD)                       │
│  $4.2M  ↑ 12% vs. last month        │
│                                      │
│  Units Sold                          │
│  12,500  ↑ 8% vs. last month        │
│                                      │
│  Avg. Contract Value                 │
│  $12,400  ↓ 2% vs. last month       │
│                                      │
│  Sales Pipeline                      │
│  $18.5M  ↑ 15% vs. last quarter     │
│                                      │
│  Win Rate                            │
│  38%  (target: 40%)                 │
│                                      │
└──────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Advanced Topics

Real-Time Analytics Architectures

Lambda Architecture

Combine batch + stream for accuracy + timeliness.

Data Source
    ↙         ↖
Batch Layer   Speed Layer
    ↓              ↓
Batch View    Real-time View
    ↖              ↙
Serving Layer
    ↓
End User
Enter fullscreen mode Exit fullscreen mode

Advantages:

  • Accurate batch + timely stream
  • Recoverable from failures

Disadvantages:

  • Operational complexity
  • Duplicate logic (batch + stream)
  • Two views to maintain

Kappa Architecture

Single stream processing pipeline.

Data Source
    ↓
Stream Processing
    ↓
Serving Layer
    ↓
End User
Enter fullscreen mode Exit fullscreen mode

Advantages:

  • Simpler (one code path)
  • Easier replay (reprocessing old data)

Disadvantages:

  • Requires long event retention
  • Stream processing complexity
  • Harder to handle late arrivals

Data Quality Frameworks

Great Expectations - Automated data validation:

import great_expectations as ge

# Load data
df = ge.read_csv("data.csv")

# Define expectations
df.expect_column_to_exist("user_id")
df.expect_column_values_to_not_be_null("email")
df.expect_column_values_to_match_regex("email", r".*@.*\..*")
df.expect_table_row_count_to_be_between(1000, 100000)

# Run validation
results = df.validate()
print(results)  # Pass/fail summary
Enter fullscreen mode Exit fullscreen mode

Data Monetization and Privacy

Data Marketplace:

  • Sell anonymized datasets
  • API access to premium datasets
  • Revenue source for data-heavy orgs

Privacy-Preserving Techniques:

  • Differential Privacy: Add noise to preserve individual privacy
  • Federated Learning: Train models without centralizing data
  • Homomorphic Encryption: Compute on encrypted data

Conclusion

Modern data engineering is a complex orchestration of:

  • Collection methods tailored to sources
  • Ingestion infrastructure (batch, streaming, API)
  • Storage tiers (hot, warm, cold)
  • Processing frameworks (Spark, Flink, dbt)
  • Serving layers (warehouses, marts, caches)
  • Governance and compliance

Each piece requires careful selection based on:

  • Latency requirements (seconds vs. hours)
  • Throughput (KBps vs. GBps)
  • Consistency needs (exactly-once vs. at-least-once)
  • Cost constraints
  • Team expertise

Understanding the full data lifecycle ;from collection to archival;enables building systems that are reliable, cost-efficient, and serve diverse organizational needs.

Top comments (0)