Table of Contents
- Introduction
- Types of Data
- Data Sources and Collection Methods
- Data Ingestion Tools and Strategies
- The Data Lifecycle
- Data Storage Systems
- Data Processing and Transformation
- Data Orchestration and Workflow Management
- Data Serving and Consumption Layers
- Data Governance and Security
- Data Archival and Retention
- Advanced Topics
Introduction
When people discuss "data," they rarely explain precisely what data is or measure its relevance. There are many underrated aspects of data that we might assume are merely "information" or "things that exist." In reality, data can be anything collected by hardware or software tools ,and understanding its nuances is fundamental to effective data engineering.
Data isn't just passive information sitting in a database. It's a carefully cultivated, collected, processed, and distributed asset. The journey from raw collection to actionable insights involves numerous specialized systems, architectural decisions, and trade-offs.
This guide dives deep into:
- What data actually is in different forms and contexts
- Where data originates and how it's collected
- How data flows through modern architectures
- Who consumes data and why each needs different representations
- Technical implementation details that matter in production
Types of Data
Data can be categorized into three fundamental types based on structure and organization:
1. Structured Data
Definition: Data organized in predefined schemas with clear relationships and constraints.
Characteristics:
- Organized in rows and columns (tabular format)
- Queryable with SQL
- Enforced data types and constraints
- Predictable schema evolution
- Typically stored in relational databases
Storage Systems:
- PostgreSQL
- MySQL
- Oracle Database
- SQL Server
- Cloud data warehouses (Snowflake, BigQuery, Redshift)
Examples:
Customer Table:
| customer_id | name | email | sign_up_date |
|-------------|-------------|-------------------|--------------|
| 1 | Alice | alice@example.com | 2024-01-15 |
| 2 | Bob | bob@example.com | 2024-01-20 |
Advantages:
- Easy to query and analyze
- Strong consistency guarantees
- Well-understood optimization techniques
- Mature tooling ecosystem
Disadvantages:
- Schema must be predefined
- Difficult to handle unstructured content
- Scaling writes can be challenging
2. Unstructured Data
Definition: Data without predefined schemas or organization, requiring specialized processing.
Characteristics:
- No fixed format or schema
- Requires specialized parsers and processors
- Often large in volume
- Stored in data lakes or object storage
- Requires metadata for context
Types of Unstructured Data:
- Text: Documents, emails, logs, PDFs
- Media: Images, audio, video
- Other: Binary files, archives, proprietary formats
Storage Systems:
- Amazon S3 / S3-compatible storage (MinIO)
- Google Cloud Storage (GCS)
- Azure Blob Storage
- Hadoop HDFS
- Data lakes (Delta Lake, Apache Iceberg)
Processing Challenges:
- Extracting meaningful information requires ML/NLP techniques
- Storage costs can be significant
- Retrieval and processing are slower than structured queries
- Duplication and quality issues are common
Example Processing Pipeline:
Raw PDFs → PDF Parser → Text Extraction → NLP → Structured Data
↓
Feature Extraction
↓
ML Feature Store
3. Semi-Structured Data
Definition: Self-describing data with some organizational structure but flexible schemas.
Characteristics:
- Uses tags, keys, or hierarchies to describe content
- Schema can evolve without migration
- Queryable but with more flexibility than relational data
- Self-documenting structure
File Formats:
- JSON (JavaScript Object Notation)
{
"user_id": 123,
"name": "Alice",
"preferences": {
"theme": "dark",
"notifications": true
},
"tags": ["vip", "early_adopter"]
}
- XML (eXtensible Markup Language)
<user>
<id>123</id>
<name>Alice</name>
<preferences>
<theme>dark</theme>
</preferences>
</user>
- YAML (YAML Ain't Markup Language)
user_id: 123
name: Alice
preferences:
theme: dark
notifications: true
- CSV (Comma-Separated Values)
user_id,name,email,signup_date
1,Alice,alice@example.com,2024-01-15
2,Bob,bob@example.com,2024-01-20
Storage Systems:
- MongoDB (document database)
- CouchDB
- Elasticsearch (search and analytics)
- DynamoDB (NoSQL)
- Apache HBase
- Cassandra
Processing Advantages:
- Schema flexibility allows rapid iteration
- Self-describing makes parsing easier
- Suitable for logs and event data
- Supports nested structures naturally
Example: API responses are often semi-structured JSON
{
"status": "success",
"data": {
"orders": [
{"id": 1, "amount": 100, "status": "shipped"},
{"id": 2, "amount": 200, "status": "pending"}
]
}
}
Data Sources and Collection Methods
Comprehensive Data Source Matrix
| Data Source Category | Specific Source | Data Type | Collection Method | Collection Tools | Latency | Volume |
|---|---|---|---|---|---|---|
| Web & App Events | Website clicks | Semi-structured (JSON) | Event tracking SDKs | Segment, Amplitude, mixpanel | Real-time | High |
| Page visits | Semi-structured | Webhooks, logs | Google Analytics, Mixpanel | Real-time | High | |
| Form submissions | Structured | API POST requests | Custom scripts, Zapier | Real-time | Medium | |
| Purchases | Structured | Payment gateway webhooks | Stripe, PayPal webhooks | Real-time | Medium | |
| Mobile Applications | App opens | Semi-structured | Mobile SDKs | Firebase, Mixpanel | Real-time | High |
| Screen views | Semi-structured | Event streaming | Firebase, Segment | Real-time | High | |
| In-app purchases | Structured | App SDKs, APIs | App Store Connect, Google Play | Real-time | Medium | |
| User sessions | Semi-structured | Client-side tracking | Firebase Analytics | Real-time | High | |
| IoT & Sensors | Temperature readings | Structured | MQTT, HTTP | AWS IoT Core, Mosquitto | Real-time | Very High |
| GPS coordinates | Structured | MQTT, CoAP | Edge gateways, Raspberry Pi | Real-time | Very High | |
| Device metrics | Structured | Cloud ingestion | Azure IoT Hub, Confluent Cloud | Real-time | Very High | |
| Databases | Transaction logs | Structured | CDC (Change Data Capture) | Debezium, Qreplication | Near real-time | High |
| Customer records | Structured | Database replication | Sqoop, Kafka Connect | Batch/Real-time | Medium | |
| Product inventory | Structured | SQL queries, APIs | Custom scripts, Airbyte | Batch/Real-time | Medium | |
| Application Logs | Server logs | Unstructured | Log forwarding agents | Fluentd, Logstash, Filebeat | Real-time | Very High |
| Application logs | Unstructured | Logging libraries | ELK Stack, Splunk | Real-time | Very High | |
| Access logs | Semi-structured | Log aggregation | Papertrail, Loggly | Real-time | Very High | |
| Event Streams | Real-time clicks | Semi-structured | Message brokers | Apache Kafka, Pulsar | Real-time | Very High |
| Transactions | Structured | Event streams | Kafka, Kinesis | Real-time | High | |
| Notifications | Semi-structured | Message queues | RabbitMQ, SQS | Real-time | Medium | |
| Third-Party APIs | Weather data | Semi-structured | HTTP polling | OpenWeather, custom scripts | Batch | Low |
| Stock prices | Structured | REST/WebSocket APIs | Alpha Vantage, Finnhub | Real-time | Medium | |
| Social media | Semi-structured | API calls | Twitter API, Facebook SDK | Batch/Real-time | High | |
| SaaS Platforms | Salesforce records | Structured | SaaS connectors | Fivetran, Stitch | Batch | Medium |
| HubSpot contacts | Structured | API, connectors | Airbyte, Zapier | Batch | Medium | |
| Stripe payments | Structured | Webhooks, APIs | Stripe CLI, custom scripts | Real-time | Medium |
Real-World Data Collection Scenarios
Scenario 1: E-Commerce Website Click Tracking
Problem: Track user behavior (clicks, page visits, conversions) for analytics and personalization.
Architecture:
User Browser (Website)
↓
JavaScript SDK
(Segment/Analytics.js)
↓
Event Queue
(in-memory buffer)
↓
HTTP POST (batched)
↓
Segment API / Custom Endpoint
↓
Message Broker
(Apache Kafka)
↓
┌─────────────────┬──────────────┐
↓ ↓ ↓
Real-time Stream Processor Data Lake
Dashboard (Spark Streaming) (S3)
(low latency) (aggregations)
Technical Details:
-
Event Capture (Client-Side)
- SDK intercepts user actions
- Batches events (100 events or 5 seconds)
- Sends HTTP POST with gzip compression
- Retries with exponential backoff on failure
Event Schema (JSON)
{
"event_id": "uuid",
"user_id": "12345",
"session_id": "sess_abc123",
"event_type": "click",
"timestamp": "2024-01-15T10:30:45Z",
"properties": {
"button_text": "Add to Cart",
"product_id": "prod_999",
"price": 29.99
},
"context": {
"ip": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"locale": "en_US"
}
}
-
Processing Pipeline
- Kafka topic:
web_events(partitioned by user_id) - Spark Streaming job aggregates per 1-minute windows
- Output: metrics, materialized views for dashboards
- Kafka topic:
-
Challenges
- Duplicate deduplication: event_id ensures idempotent processing
- Out-of-order events: window-based aggregation tolerates some skew
- Late arrivals: grace period allows 15-minute lateness
- User privacy: PII (email, phone) never sent; use user_id instead
Tools Used:
- Client-side: Segment JavaScript SDK
- Transport: HTTPS with gzip
- Queue: Apache Kafka (3 brokers, 3 replicas)
- Processing: Apache Spark (PySpark)
- Storage: Amazon S3 + Delta Lake
- Serving: Looker dashboards
Scenario 2: IoT Sensor Data from Industrial Plant
Problem: Ingest temperature and pressure readings from 10,000 sensors across a factory every second.
Architecture:
Sensors (10k devices)
↓
MQTT Protocol
(publish/subscribe)
↓
MQTT Broker Cluster
(Mosquitto or HiveMQ)
↓
Topic Tree:
factory/line1/temp
factory/line1/pressure
factory/line2/temp
↓
Edge Gateway (stream processor)
↓
Message Queue (Kafka)
↓
┌──────────────┬─────────────────┬──────────┐
↓ ↓ ↓ ↓
Real-time Time-series Cold Storage Anomaly
Alerts Database (Parquet) Detection
(millisec) (InfluxDB) (S3 Archive)
Technical Details:
-
MQTT Architecture
- Protocol: MQTT 3.1.1 / 5.0
- QoS levels: QoS 1 (at least once) for critical sensors
- Retained messages: last reading persisted on broker
- Topic hierarchy enables fanout subscriptions
Sensor Message Format
{
"device_id": "sensor_001_line1",
"device_type": "temperature",
"timestamp": 1705322445000,
"value": 72.5,
"unit": "celsius",
"location": {
"facility": "factory_a",
"line": 1,
"zone": "assembly"
},
"quality": "good",
"battery_voltage": 4.8
}
- Stream Processing (Kafka + Flink)
# Pseudo-code for Flink job
input_stream = env.add_source(kafka_source)
# Tumbling window: aggregate over 10 seconds
windowed = (input_stream
.key_by("device_id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(compute_stats)) # min, max, avg, p95
# Alert if temperature > 90°C
alerts = windowed.filter(lambda x: x['temp_max'] > 90)
alerts.add_sink(alert_sink)
-
Time-Series Storage
- Database: InfluxDB (or TimescaleDB)
- Retention: 7 days hot, then compress
- Compression: 90% reduction via delta-of-delta encoding
- Indexing: tag-based (device_id, facility, line)
-
Challenges
- Network failures: QoS 1 + local buffering ensures no data loss
- Clock skew: sensors without NTP; use broker timestamp
- Hot partitions: if one line sends 10x more data, partition by (facility, line)
- Storage explosion: 10k sensors × 1 reading/sec = 860M readings/day = ~10TB/year (compressed)
Tools Used:
- Devices: Arduino, Raspberry Pi with MQTT libraries
- Broker: HiveMQ cluster (3 nodes, HA setup)
- Edge Gateway: Custom Flink job
- Queue: Apache Kafka (6 brokers, replication factor 3)
- Time-series DB: InfluxDB
- Alerting: PagerDuty via webhook
- Archive: AWS Glacier
Scenario 3: Database Change Capture (CDC)
Problem: Keep a data warehouse in sync with production PostgreSQL database changes in real-time.
Architecture:
Production PostgreSQL
(OLTP)
↓
Transaction Logs
(WAL - Write-Ahead Log)
↓
Debezium Connector
(PostgreSQL reader)
↓
Kafka Topic
(change events)
↓
┌─────────────────┐
↓ ↓
Stream Processor Data Warehouse
(JDBC Sink) (Snowflake)
↓ ↓
Aggregations Materialized Views
(Flink SQL)
Technical Details:
- Debezium Setup
connector.class: io.debezium.connector.postgresql.PostgresConnector
plugin.name: pgoutput # Logical decoding plugin
slot.name: debezium_slot
publication.name: dbz_publication
include.tables: public.customers,public.orders
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
- Change Event Format (JSON)
{
"before": null,
"after": {
"customer_id": 123,
"name": "Alice Johnson",
"email": "alice@example.com",
"created_at": 1705322445000
},
"source": {
"version": "2.5.0",
"connector": "postgresql",
"name": "mydb",
"ts_ms": 1705322445123,
"txId": 500,
"lsn": 23456789,
"xmin": null,
"table": "customers"
},
"op": "c",
"ts_ms": 1705322445500
}
-
op:c(create),u(update),d(delete),r(read)
-
Kafka Topic Organization
-
Topic:
cdc.production.customers - Partitioning: by customer_id (ensures order per customer)
- Retention: 7 days (replay capability)
-
Topic:
Warehouse Sync (Snowflake)
-- Merge statement executed per batch
MERGE INTO staging.customers t
USING (SELECT * FROM cdc_batch) s
ON t.customer_id = s.customer_id
WHEN MATCHED AND s.op = 'd' THEN DELETE
WHEN MATCHED AND s.op IN ('c', 'u') THEN UPDATE
SET name = s.name, email = s.email, updated_at = s.ts_ms
WHEN NOT MATCHED AND s.op IN ('c', 'u') THEN INSERT
VALUES (s.customer_id, s.name, s.email, s.ts_ms);
-
Advantages of CDC
- Zero impact: read-only against production WAL
- Completeness: captures all changes (no missed updates)
- Ordering: preserves transactional order
- Efficiency: incremental only (no full table scans)
-
Challenges
- Initial sync: snapshot mode reads table first (locks), then transitions to incremental
- Large tables: snapshot can take hours; consider parallel readers
- Transactions: large batch deletes create many events
- Schema changes: Debezium propagates ALTER TABLE events as special records
Tools Used:
- CDC Engine: Debezium 2.x
- Message Queue: Apache Kafka
- Sink: Snowflake JDBC connector
- Orchestration: Kubernetes (Strimzi operator)
Categories of Data Collection Tools
A. Streaming/Real-Time Platforms
Used for continuous, low-latency data ingestion:
| Tool | Protocol | Throughput | Latency | State | Ordering |
|---|---|---|---|---|---|
| Apache Kafka | Custom binary | 100k+ msgs/sec | ~10ms | Stateful (Log) | Per-partition |
| Apache Pulsar | Custom binary | 100k+ msgs/sec | ~10ms | Tiered storage | Per-partition |
| AWS Kinesis | HTTP/WebSocket | 1k+ records/sec per shard | 100-200ms | Managed state | Per-shard |
| Google Pub/Sub | gRPC | 100k+ msgs/sec | 100-500ms | Managed state | No guarantee |
| RabbitMQ | AMQP | 50k msgs/sec | <10ms | In-memory/disk | Per-queue |
Kafka Example: High-volume clickstream
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Wait for all replicas
compression_type='snappy'
)
# Publish events
for event in user_events:
producer.send(
'web_events',
value=event,
key=str(event['user_id']).encode('utf-8'), # Partition key
timestamp_ms=int(event['timestamp'])
)
B. ETL / ELT Tools
Move and transform data between systems:
| Tool | Type | Scheduling | Transformation | Cloud Support |
|---|---|---|---|---|
| Airbyte | ELT | Connectors | dbt | Yes (managed) |
| Fivetran | ELT | Connectors | dbt | Yes (managed) |
| Talend | ETL | Workflows | Custom jobs | Hybrid |
| Informatica | ETL | Designer | GUI-based | Hybrid |
| Apache Airflow | Orchestration | DAGs | dbt, custom code | Kubernetes-native |
| Prefect | Orchestration | DAGs | Python code | Cloud-native |
Airbyte Example: Sync Salesforce to Snowflake
# connection.yaml
source_type: salesforce
source_config:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
refresh_token: ${SALESFORCE_REFRESH_TOKEN}
destination_type: snowflake
destination_config:
host: mycompany.us-east-1.snowflakecomputing.com
database: analytics_db
schema: salesforce_sync
warehouse: compute_wh
sync_schedule: every_6_hours
C. Log Collection Tools
Specialized for logs, metrics, and traces:
| Tool | Input Protocol | Processing | Output |
|---|---|---|---|
| Fluentd | syslog, HTTP, file tailing | Filtering, parsing, buffering | Kafka, S3, MongoDB |
| Logstash | syslog, HTTP, file | Rich parsing via Grok | Elasticsearch, S3 |
| Filebeat | Log files, stdin | Lightweight, minimal parsing | Logstash, Kafka, Elasticsearch |
| Vector | 200+ sources | High throughput, low memory | 60+ destinations |
Fluentd Example: Parse application logs
# fluent.conf
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/app.pos
tag app.log
<parse>
@type json
</parse>
</source>
<match app.log>
@type kafka
brokers kafka-1:9092,kafka-2:9092
topic_key app_logs
compression_codec snappy
</match>
D. API Integration Tools
Pull data from SaaS platforms and APIs:
| Tool | Authentication | Scheduling | Error Handling |
|---|---|---|---|
| Postman | OAuth, API keys, mTLS | Manual + CLI | Retries, custom scripts |
| Zapier | OAuth flows | Frequency-based | Built-in retry logic |
| Make | Connectors | Real-time + scheduled | Error handler modules |
Custom API Ingestion (Python)
import requests
from datetime import datetime, timedelta
import json
def fetch_github_events(org: str, output_path: str):
"""Fetch GitHub org events via REST API."""
page = 1
all_events = []
while True:
url = f"https://api.github.com/orgs/{org}/events"
params = {
'page': page,
'per_page': 100,
}
headers = {
'Authorization': f'token {GITHUB_TOKEN}',
'Accept': 'application/vnd.github+json'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
events = response.json()
if not events:
break
all_events.extend(events)
page += 1
# Respect rate limits
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
if remaining < 10:
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
wait_seconds = reset_time - datetime.now().timestamp()
if wait_seconds > 0:
print(f"Rate limited; waiting {wait_seconds}s")
time.sleep(wait_seconds + 1)
# Store as JSONL (one record per line)
with open(output_path, 'w') as f:
for event in all_events:
f.write(json.dumps(event) + '\n')
print(f"Fetched {len(all_events)} events to {output_path}")
fetch_github_events('microsoft', 'github_events.jsonl')
Key Principle: Matching Collection Methods to Data Sources
Different sources require fundamentally different collection approaches:
┌─────────────────────────────────────────────────────────────┐
│ Source Type → Collection Method │
├─────────────────────────────────────────────────────────────┤
│ Databases (OLTP) → CDC (Debezium, Qreplication) │
│ APIs (REST, GraphQL) → Connectors, polling, webhooks │
│ Logs (files, syslog) → Log shippers (Fluentd, Filebeat) │
│ Real-time clicks → Event streaming (Kafka, Pulsar) │
│ Sensors (IoT) → MQTT, CoAP, HTTP collectors │
│ SaaS platforms → Native connectors (Fivetran) │
│ Message queues → Direct subscription (Kafka) │
│ Analytics services → Bulk export APIs (S3) │
└─────────────────────────────────────────────────────────────┘
The combination of source + collection method + transport + storage + processing forms a data pipeline—the foundation of modern data systems.
Data Ingestion Tools and Strategies
Push vs. Pull Architectures
Push Architecture
Source systems actively send data to collection endpoints.
Advantages:
- Real-time or near-real-time delivery
- Source system knows delivery status
- Suitable for high-velocity data
- Webhook-based (event-driven)
Disadvantages:
- Requires sender to implement push logic
- Destination must be always available
- Backpressure harder to implement
- Source system load impacts reliability
Example: Payment gateway webhook
@app.post("/webhooks/stripe")
async def handle_stripe_webhook(request: Request):
"""Stripe pushes payment events to our endpoint."""
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, webhook_secret
)
except ValueError:
return {"error": "Invalid payload"}, 400
# Push to message queue for processing
kafka_producer.send('stripe_events', value=event)
return {"status": "received"}, 200
Pull Architecture
Destination systems proactively fetch data from sources.
Advantages:
- Destination controls pace and timing
- Easy backpressure and error handling
- Retries simple (idempotent reads)
- Source system passive
Disadvantages:
- Latency: data not delivered until next pull
- Polling overhead (many failed checks)
- Hard to get all changes (need cursors/timestamps)
- Source must support random access
Example: Polling GitHub API for new commits
import hashlib
import json
from datetime import datetime, timedelta
class GitHubPoller:
def __init__(self, repo: str, token: str):
self.repo = repo
self.token = token
self.session = requests.Session()
self.session.headers['Authorization'] = f'token {token}'
self.last_commit_sha = None
def get_new_commits(self) -> list[dict]:
"""Pull commits since last poll."""
# Use 'since' parameter to filter recent commits
since = (datetime.utcnow() - timedelta(minutes=5)).isoformat()
url = f"https://api.github.com/repos/{self.repo}/commits"
params = {
'since': since,
'per_page': 100,
}
response = self.session.get(url, params=params)
commits = response.json()
# Update checkpoint
if commits:
self.last_commit_sha = commits[0]['sha']
return commits
def poll_loop(self, interval_seconds: int = 300):
"""Poll every N seconds."""
while True:
commits = self.get_new_commits()
for commit in commits:
# Process or queue commit
print(f"Found commit: {commit['sha']}")
time.sleep(interval_seconds)
Batch vs. Streaming Ingestion
Batch Ingestion
Collect data in fixed intervals and load in bulk.
Use Cases:
- Historical data imports
- End-of-day reports
- Large dataset migrations
- Cost-sensitive (off-peak loading)
Example: Daily database export
#!/bin/bash
# Export PostgreSQL nightly
pg_dump --format=custom \
--compress=9 \
--jobs=4 \
postgresql://user:pass@prod-db.local/mydb \
> /data/exports/mydb_$(date +%Y%m%d).dump
# Upload to S3
aws s3 cp /data/exports/mydb_$(date +%Y%m%d).dump \
s3://my-data-lake/databases/postgresql/daily/
Advantages:
- Simpler architecture (cron + SQL)
- Cost-effective (offload during low-traffic hours)
- Easier error recovery (entire batch succeeds/fails)
- Well-suited for large migrations
Disadvantages:
- Latency: data 1+ days old
- Inefficient for small deltas
- Storage spikes on load days
- Harder to detect issues quickly
Streaming Ingestion
Continuously ingest small data units as they arrive.
Use Cases:
- Real-time dashboards
- Live fraud detection
- Personalization engines
- Time-sensitive analytics
Example: Kafka producer for application events
from kafka import KafkaProducer
from datetime import datetime
import json
import logging
class AppEventProducer:
def __init__(self, brokers: list[str]):
self.producer = KafkaProducer(
bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Ensure durability
compression_type='snappy',
retries=3,
max_in_flight_requests_per_connection=5,
)
self.logger = logging.getLogger(__name__)
def send_event(self, event_type: str, **data):
"""Send event to Kafka asynchronously."""
message = {
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'data': data,
}
try:
# Async send; callback fires on completion
self.producer.send(
f'app_events_{event_type}',
value=message,
key=str(data.get('user_id')).encode('utf-8'),
).add_callback(
lambda meta: self.logger.info(
f"Event {event_type} sent to partition {meta.partition}"
)
).add_errback(
lambda exc: self.logger.error(f"Failed to send: {exc}")
)
except Exception as e:
self.logger.error(f"Error sending event: {e}")
def flush(self):
"""Block until all pending messages sent."""
self.producer.flush(timeout_ms=30000)
# Usage
producer = AppEventProducer(['kafka-1:9092', 'kafka-2:9092'])
producer.send_event('user_signup', user_id=123, email='user@example.com')
producer.send_event('purchase', user_id=123, amount=99.99)
producer.flush()
Advantages:
- Real-time availability
- Incremental updates only
- Easier to detect anomalies
- Supports complex windowed aggregations
Disadvantages:
- Complex infrastructure (brokers, coordinators)
- Harder to replay/recover from errors
- Requires handling out-of-order events
- Higher operational overhead
The Data Lifecycle
Data follows a complete journey from creation to deletion. Understanding each phase is crucial for building resilient systems:
┌──────────────────────────────────────────────────────────────────┐
│ Data Lifecycle Phases │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 1. Generation/Creation │
│ ↓ │
│ 2. Collection/Ingestion │
│ ↓ │
│ 3. Storage (Raw) │
│ ↓ │
│ 4. Processing/Transformation │
│ ↓ │
│ 5. Orchestration & Workflow Management │
│ ↓ │
│ 6. Serving/Consumption │
│ ↓ │
│ 7. Governance & Security │
│ ↓ │
│ 8. Archival or Deletion │
│ │
└──────────────────────────────────────────────────────────────────┘
Phase 1: Data Generation / Creation
Definition: Data originates from business processes and systems.
Sources of Generation:
- User interactions (clicks, form inputs, searches)
- System operations (logs, metrics, traces)
- Business transactions (orders, payments, reservations)
- External events (weather, market data)
- Sensors and IoT devices
Key Considerations:
Data Quality at Source:
- Validate data format at entry point
- Enforce schema constraints
- Assign unique identifiers (UUIDs preferred)
- Include timestamps (in UTC, not local time)
- Add metadata (source system, user context)
Example: E-commerce order generation
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid4
from enum import Enum
class OrderStatus(str, Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
@dataclass
class Order:
order_id: str = None
user_id: str = None
items: list = None
total_amount: float = 0.0
status: OrderStatus = OrderStatus.PENDING
created_at: str = None
updated_at: str = None
def __post_init__(self):
if not self.order_id:
self.order_id = str(uuid4())
if not self.created_at:
self.created_at = datetime.utcnow().isoformat()
self.updated_at = self.created_at
def to_dict(self):
return {
'order_id': self.order_id,
'user_id': self.user_id,
'items': self.items,
'total_amount': round(self.total_amount, 2),
'status': self.status.value,
'created_at': self.created_at,
'updated_at': self.updated_at,
}
Phase 2: Data Collection / Ingestion
Definition: Data is gathered and moved into centralized storage or processing systems.
Ingestion Methods (Recap):
- Batch - Scheduled uploads
- Real-time streaming - Continuous flow
- API polling - On-demand requests
- Message queues - Event-driven push
- Database CDC - Transactional logs
Critical Design Patterns:
Exactly-Once Semantics:
- Ensure no duplicates even if systems fail
- Use idempotent operations (based on message IDs)
- Implement deduplication in downstream systems
- Example: Kafka + offset management
Backpressure Handling:
- Don't overwhelm destination systems
- Use circuit breakers for downstream failures
- Implement queues with bounded capacity
- Slow consumption automatically slows production
import asyncio
from typing import AsyncIterator
class BackpressureQueue:
def __init__(self, max_size: int = 1000):
self.queue = asyncio.Queue(maxsize=max_size)
async def produce(self, item):
"""Block if queue full (backpressure)."""
try:
self.queue.put_nowait(item)
except asyncio.QueueFull:
print(f"Queue full; applying backpressure")
await self.queue.put(item) # This blocks
async def consume(self) -> AsyncIterator:
"""Consume items with automatic flow control."""
while True:
item = await self.queue.get()
yield item
self.queue.task_done()
Phase 3: Data Storage (Raw)
Definition: Raw, unprocessed data is persisted for future processing and compliance.
Storage Tier Strategy:
┌─────────────────────────────────────────────────────┐
│ Storage Architecture │
├─────────────────────────────────────────────────────┤
│ │
│ Hot Storage (Fast Access) │
│ ├─ Local SSD / NVMe (sub-millisecond latency) │
│ ├─ Cloud block storage (EBS, Persistent Disk) │
│ └─ Cost: $0.10-0.20 / GB / month │
│ │
│ Warm Storage (Moderate Access) │
│ ├─ Object storage (S3, GCS) w/ standard tier │
│ ├─ Data lakes (Delta, Iceberg) on cloud storage │
│ └─ Cost: $0.023 / GB / month │
│ │
│ Cold Storage (Archive) │
│ ├─ S3 Glacier, Azure Archive │
│ ├─ On-premises tape │
│ └─ Cost: $0.004 / GB / month (retrieval in hours) │
│ │
└─────────────────────────────────────────────────────┘
Format and Compression:
| Format | Compression | Use Case | Read Speed |
|---|---|---|---|
| Parquet | Snappy (default) | Analytical queries | ✓ Columnar access |
| ORC | Zstd | Hive/Spark, columnar | ✓ Excellent compression |
| JSONL | None/gzip | Logs, unstructured | ✗ Slower (full scan) |
| Avro | Deflate | Streaming, schema evolution | ~ Row-based |
| CSV | gzip | Legacy systems | ✗ Text parsing overhead |
| Protobuf | None | Microservices, bandwidth | ~ Compact binary |
Example: Write partitioned Parquet dataset
import pandas as pd
from pyarrow import parquet as pq
import pyarrow as pa
from datetime import datetime
def write_data_lake(events: list[dict], output_path: str):
"""Write events to S3 partitioned by date and event type."""
# Convert to DataFrame
df = pd.DataFrame(events)
df['event_date'] = pd.to_datetime(df['timestamp']).dt.date
df['event_type'] = df['event_type'].astype('category')
# Write partitioned dataset
table = pa.Table.from_pandas(df)
pq.write_table(
table,
output_path,
partition_cols=['event_date', 'event_type'],
compression='snappy',
use_legacy_format=False,
coerce_timestamps='ms',
)
# Result:
# s3://data-lake/events/
# event_date=2024-01-15/
# event_type=click/
# part-0.parquet
# event_type=purchase/
# part-0.parquet
# event_date=2024-01-16/
# ...
write_data_lake(
[
{'timestamp': '2024-01-15T10:30:00Z', 'event_type': 'click', 'user_id': 1},
{'timestamp': '2024-01-15T11:00:00Z', 'event_type': 'purchase', 'user_id': 2},
],
's3://my-data-lake/events/'
)
Phase 4: Data Processing / Transformation
Definition: Raw data is cleaned, transformed, validated, and enriched to make it useful.
Common Transformations:
| Transformation | Purpose | Example |
|---|---|---|
| Deduplication | Remove duplicate records | df.drop_duplicates(subset=['event_id']) |
| Filtering | Remove irrelevant/bad data | df[df['amount'] > 0] |
| Type Casting | Normalize data types | Convert strings to datetime |
| Standardization | Consistent formatting | Email lowercase, phone E.164 |
| Enrichment | Add context/reference data | Join user geolocation |
| Aggregation | Summarize data | Sum sales per day, avg price |
| Pivoting | Reshape for analytics | Dimensions vs. metrics |
| Nullability | Handle missing values | Fill NULLs, drop empty rows |
Processing Styles:
Batch Processing
Process large volumes at scheduled intervals.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_utc_timestamp, dayofweek, hour
spark = SparkSession.builder \
.appName("DataProcessing") \
.getOrCreate()
# Read raw data from S3
raw_events = spark.read.parquet("s3://data-lake/raw/events/event_date=2024-01-15/")
# Clean and transform
cleaned = raw_events \
.filter(col("event_type").isin(["click", "purchase", "view"])) \
.filter(col("amount") >= 0) \
.fillna({"amount": 0}) \
.withColumn("event_date", from_utc_timestamp(col("timestamp"), "US/Pacific")) \
.withColumn("day_of_week", dayofweek("event_date")) \
.withColumn("hour", hour("event_date"))
# Aggregations
daily_summary = cleaned \
.groupby("event_date", "event_type") \
.agg({
"amount": "sum",
"user_id": "count",
}) \
.rename({"sum(amount)": "total_amount", "count(user_id)": "event_count"})
# Write to warehouse
daily_summary.write \
.mode("overwrite") \
.parquet("s3://data-warehouse/processed/daily_events/event_date=2024-01-15/")
Stream Processing
Process data as it arrives, maintaining state.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum as spark_sum
spark = SparkSession.builder \
.appName("StreamingAnalysis") \
.getOrCreate()
# Read from Kafka
events = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user_events") \
.option("startingOffsets", "latest") \
.load()
# Parse and transform
parsed = events.select(
col("value").cast("string"),
col("timestamp")
).withColumn("json", from_json(col("value"), event_schema))
# 5-minute tumbling window aggregation
windowed_agg = parsed \
.groupby(
window(col("timestamp"), "5 minutes"),
col("json.event_type")
) \
.agg(spark_sum("json.amount").alias("total_amount"))
# Write to sink (e.g., Kafka, database)
query = windowed_agg \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "event_aggregates") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()
query.awaitTermination()
Phase 5: Data Orchestration & Workflow Management
Definition: Pipelines are scheduled, monitored, and automated to run reliably.
Orchestration Concepts:
DAG (Directed Acyclic Graph):
Each pipeline is a graph of tasks with dependencies.
┌─────────────────────────────────────────┐
│ Daily ETL Pipeline │
├─────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ Extract │ (Fetch raw data) │
│ └────┬────┘ │
│ │ │
│ ┌────▼─────┐ │
│ │ Transform │ (Clean & enrich) │
│ └────┬──┬──┘ │
│ │ └─────────────┐ │
│ ┌────▼──┐ ┌─────▼──┐ │
│ │ Load │ │ Validate│ │
│ │Warehouse │Quality │ │
│ └────┬──┘ └─────┬──┘ │
│ │ │ │
│ ┌────▼────────────────▼─┐ │
│ │ Run QA Checks │ │
│ │ (Row counts, nulls) │ │
│ └────┬───────────────────┘ │
│ │ │
│ ┌────▼──────────────┐ │
│ │ Alert (success │ │
│ │ or failure) │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────┘
Example: Apache Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-eng',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'start_date': days_ago(1),
}
dag = DAG(
'daily_data_pipeline',
default_args=default_args,
description='Daily ETL pipeline',
schedule_interval='0 2 * * *', # 2 AM daily
catchup=False,
)
def extract_data(**context):
"""Extract raw data from source."""
print("Extracting data...")
# Query source system, write to /tmp/raw_data.parquet
def transform_data(**context):
"""Transform and clean data."""
print("Transforming data...")
# Read raw, apply transformations, write to /tmp/transformed_data.parquet
def load_data(**context):
"""Load into warehouse."""
print("Loading to warehouse...")
# Insert into Snowflake
def run_quality_checks(**context):
"""Validate data quality."""
print("Running quality checks...")
# Count rows, check nulls, etc.
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
quality_task = PythonOperator(
task_id='quality',
python_callable=run_quality_checks,
dag=dag,
)
alert_task = BashOperator(
task_id='alert',
bash_command="""
if [ {{ task_instance.state }} == 'success' ]; then
curl -X POST https://slack.com/api/chat.postMessage \
-d "text=Pipeline succeeded"
fi
""",
dag=dag,
)
# Define dependencies
extract_task >> transform_task >> load_task >> quality_task >> alert_task
Orchestration Tools Comparison:
| Tool | Paradigm | Language | Scalability | UI |
|---|---|---|---|---|
| Apache Airflow | DAG-based | Python | Kubernetes | Rich web UI |
| Prefect | Flow-based | Python | Cloud-native | Modern UI |
| Dagster | Asset-oriented | Python | Kubernetes | Opinionated UI |
| dbt | SQL transformation | YAML/Jinja2 | Lightweight | Web dashboard |
| Nextflow | Scientific workflow | Groovy/Java | HPC | Minimal UI |
| Apache Beam | Unified API | Python/Java | Distributed | N/A (library) |
Phase 6: Data Serving / Consumption
Definition: Processed data is made available to end users and systems.
Serving Layers:
A. Data Warehouses (OLAP)
Centralized repository for analytical queries.
Characteristics:
- Columnar storage (compressed)
- SQL queryable
- MPP (Massively Parallel Processing)
- Historical data (5-10 years typical)
- Query latency: 1-100 seconds
Leading Solutions:
- Snowflake: Cloud-native, instant elasticity
- Google BigQuery: Serverless, integrated with Google Cloud ecosystem
- Amazon Redshift: On-premises or cloud, Postgres-compatible
- Databricks (Lakehouse): Combines data lake + warehouse
Example: BigQuery query
-- Analyze sales trends
SELECT
DATE(created_at) as sale_date,
product_category,
SUM(amount) as daily_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(amount) as avg_transaction_value,
FROM `project.dataset.orders`
WHERE DATE(created_at) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) AND CURRENT_DATE()
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC
B. Data Marts
Department-specific subsets with distinct definitions.
Finance Data Mart:
CREATE TABLE finance.daily_revenue AS
SELECT
DATE(transaction_date) as revenue_date,
SUM(amount) as total_revenue,
COUNT(*) as transaction_count,
SUM(amount) / COUNT(*) as avg_transaction,
SUM(CASE WHEN refunded THEN amount ELSE 0 END) as refunded_amount,
FROM warehouse.transactions
WHERE transaction_type = 'SALE'
GROUP BY 1;
Marketing Data Mart:
CREATE TABLE marketing.daily_funnel AS
SELECT
DATE(created_at) as funnel_date,
campaign_id,
COUNT(DISTINCT CASE WHEN event_type = 'visit' THEN user_id END) as visits,
COUNT(DISTINCT CASE WHEN event_type = 'click' THEN user_id END) as clicks,
COUNT(DISTINCT CASE WHEN event_type = 'signup' THEN user_id END) as signups,
ROUND(100.0 * COUNT(DISTINCT CASE WHEN event_type = 'signup' THEN user_id END)
/ NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'visit' THEN user_id END), 0), 2) as conversion_rate,
FROM warehouse.marketing_events
GROUP BY 1, 2;
C. Feature Stores
Engineered features for machine learning models.
Purpose:
- Consistent features between training and serving
- Real-time feature computation
- Feature versioning and lineage
- Reduced model training time
Tools:
- Feast (open-source, built on Spark)
- Tecton (enterprise, real-time + batch)
- Hopsworks (open-source + managed)
Example: Feature store definition (Feast)
from feast import Feature, FeatureView, Entity, FileSource
from feast.types import Float32, Int32
from datetime import timedelta
# Define entities
user = Entity(name="user_id", value_type=ValueType.INT64)
# Define feature sources
transactions = FileSource(
path="s3://my-data/transactions.parquet",
event_timestamp_column="timestamp",
)
# Define features
user_features = FeatureView(
name="user_transaction_features",
entities=[user],
ttl=timedelta(days=30),
features=[
Feature(name="total_spent", dtype=Float32),
Feature(name="transaction_count", dtype=Int32),
Feature(name="avg_transaction_value", dtype=Float32),
Feature(name="days_since_last_purchase", dtype=Int32),
],
online=True, # Serve in real-time
source=transactions,
tags={"version": "v1", "owner": "ml-platform"},
)
# At inference time:
# features = store.get_online_features(
# features=["user_features:total_spent", "user_features:transaction_count"],
# entity_rows=[{"user_id": 123}],
# )
D. BI / Dashboard Tools
Visualization and reporting for non-technical users.
Leading Solutions:
- Tableau: Rich visualizations, enterprise focus
- Power BI: Microsoft ecosystem integration
- Looker: Semantic layer, LookML language
- Superset: Open-source, self-hosted
Example: Looker dashboard definition (LookML)
dashboard: sales_performance {
title: "Sales Performance Dashboard"
tile: daily_revenue {
query: daily_revenue_query
title: "Daily Revenue"
visualization: {
type: looker_column
}
}
tile: customer_acquisition {
query: customer_acq_query
title: "New Customers"
visualization: {
type: looker_line
}
}
}
E. Operational Serving
Real-time data for applications and user-facing systems.
Latency Requirements:
- < 100ms for recommendation engines
- < 500ms for search results
- < 1s for personalization
Storage Systems:
- Redis (in-memory cache, sub-millisecond)
- Memcached (distributed cache)
- Elasticsearch (search index, ~10ms queries)
- DynamoDB (NoSQL, single-digit ms)
Example: Redis feature cache for recommendations
import redis
import json
from datetime import datetime, timedelta
redis_client = redis.Redis(host='redis-cluster', port=6379, db=0)
def get_user_recommendations(user_id: int, refresh_if_stale: bool = True) -> list:
"""Get recommendations with cache fallback."""
cache_key = f"recommendations:{user_id}"
# Try cache first
cached = redis_client.get(cache_key)
if cached:
data = json.loads(cached)
if not refresh_if_stale or is_fresh(data['cached_at']):
return data['recommendations']
# Compute fresh recommendations
recommendations = compute_recommendations(user_id)
# Cache with 1-hour TTL
redis_client.setex(
cache_key,
timedelta(hours=1),
json.dumps({
'recommendations': recommendations,
'cached_at': datetime.utcnow().isoformat(),
})
)
return recommendations
def is_fresh(cached_at_str: str, max_age_hours: int = 1) -> bool:
cached_at = datetime.fromisoformat(cached_at_str)
age = (datetime.utcnow() - cached_at).total_seconds() / 3600
return age < max_age_hours
Phase 7: Data Governance & Security
Definition: Ensure data is secure, accurate, compliant, and auditable.
Core Governance Pillars:
A. Data Lineage
Track data flow and transformations.
┌────────────────────────────────────────────┐
│ Data Lineage Example │
├────────────────────────────────────────────┤
│ │
│ PostgreSQL (production) │
│ ├─ table: customers │
│ ├─ table: orders │
│ │ │
│ Kafka (streaming) │
│ ├─ topic: cdc.customers │
│ ├─ topic: cdc.orders │
│ │ │
│ Spark Job (transform) │
│ ├─ Input: Kafka + S3 lookup │
│ ├─ Logic: Enrich, deduplicate │
│ ├─ Output: S3 parquet │
│ │ │
│ Snowflake (warehouse) │
│ ├─ table: raw.orders │
│ ├─ table: analytics.daily_revenue │
│ │ (derived from raw.orders) │
│ │ │
│ Tableau (BI) │
│ └─ Dashboard: Sales Performance │
│ └─ Uses: analytics.daily_revenue │
│ │
└────────────────────────────────────────────┘
Tools:
- Apache Atlas (open-source, Hadoop ecosystem)
- Collibra (enterprise, workflow-based)
- Datafold (automated, CI/CD-integrated)
B. Access Control
Restrict who can see what data.
Role-Based Access Control (RBAC):
-- Snowflake example: Role-based grants
CREATE ROLE analyst_finance;
CREATE ROLE analyst_marketing;
GRANT USAGE ON DATABASE analytics TO ROLE analyst_finance;
GRANT SELECT ON SCHEMA analytics.finance TO ROLE analyst_finance;
GRANT SELECT ON TABLE analytics.finance.revenue TO ROLE analyst_finance;
-- Deny access to sensitive columns
GRANT SELECT ON TABLE analytics.orders (id, amount, date) TO ROLE analyst_marketing;
-- Columns like: customer_email, payment_method denied
-- Assign role to user
GRANT ROLE analyst_finance TO USER alice@company.com;
Attribute-Based Access Control (ABAC):
- Dynamic column-level masking
- Row filtering based on attributes
- Example: Marketing analysts see only their region's data
-- Snowflake: Dynamic masking
ALTER TABLE analytics.customers
MODIFY COLUMN email
SET MASKING POLICY email_mask USING (CURRENT_ROLE());
CREATE MASKING POLICY email_mask AS
(col_value varchar) returns varchar ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST', 'ADMIN') THEN col_value
ELSE '***@***.***'
END;
C. Data Encryption
Protect data at rest and in transit.
Encryption Strategies:
| Layer | Method | Key Management | Use Case |
|---|---|---|---|
| In Transit | TLS 1.3 | Certificate authority | Network traffic |
| At Rest | AES-256 | Cloud KMS or HSM | Database storage |
| Column-level | AES-256 | Separate key per column | PII fields |
| Database native | Provider-specific | AWS KMS, Azure Key Vault | Cloud warehouses |
Example: Encrypting sensitive columns in Python
from cryptography.fernet import Fernet
import base64
import hashlib
class ColumnEncryptor:
def __init__(self, master_key: str):
# Derive key from master_key (in production, use AWS KMS)
key = base64.urlsafe_b64encode(
hashlib.pbkdf2_hmac('sha256', master_key.encode(), b'salt', 100000)[:32]
)
self.cipher_suite = Fernet(key)
def encrypt(self, plaintext: str) -> str:
encrypted = self.cipher_suite.encrypt(plaintext.encode())
return base64.b64encode(encrypted).decode()
def decrypt(self, ciphertext: str) -> str:
decrypted = self.cipher_suite.decrypt(
base64.b64decode(ciphertext.encode())
)
return decrypted.decode()
# Usage
encryptor = ColumnEncryptor('my-master-key')
encrypted_email = encryptor.encrypt('user@example.com')
# Store encrypted_email in database
# Decrypt on demand for authorized users
D. Data Quality Monitoring
Detect anomalies and quality issues.
Quality Dimensions:
- Completeness: No unexpected nulls
- Accuracy: Data matches reality
- Consistency: Values align across systems
- Timeliness: Data fresh enough
- Uniqueness: No unintended duplicates
Example: dbt tests for data quality
# dbt_project.yml
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- dbt_utils.not_empty_string
- custom_test_positive_amount
- name: customer_id
tests:
- relationships:
to: ref('customers')
field: customer_id
tests:
- name: custom_test_positive_amount
description: "Amount should be positive"
sql: |
SELECT *
FROM {{ ref('orders') }}
WHERE amount <= 0
-- Should return 0 rows
E. Auditing and Compliance
Track access, changes, and regulatory requirements.
Audit Log Example:
{
"timestamp": "2024-01-15T10:30:00Z",
"user": "alice@company.com",
"action": "SELECT",
"database": "analytics",
"table": "customers",
"columns": ["id", "name", "email"],
"rows_returned": 1000,
"query_id": "q_abc123",
"client_ip": "192.168.1.100",
"query": "SELECT id, name, email FROM customers WHERE region = 'US' LIMIT 1000"
}
Compliance Standards:
- GDPR (EU): Right to be forgotten, data portability
- HIPAA (healthcare): Encryption, audit trails, access controls
- SOC 2 (finance/SaaS): Security controls, monitoring
- CCPA (California): Consumer privacy rights
Phase 8: Data Archival and Retention
Definition: Old or unused data is preserved cost-effectively or deleted per policy.
Archival Strategy:
Time Passes →
Current Data (hot)
├─ Location: Fast SSD/NVMe
├─ Cost: $0.20/GB/month
├─ Retention: Current + 30 days
└─ Use: Daily dashboards, real-time analytics
30-90 Days Old (warm)
├─ Location: Cloud standard storage (S3)
├─ Cost: $0.023/GB/month
├─ Retention: 90 days
└─ Use: Monthly reports, compliance checks
90+ Days Old (cold)
├─ Location: Glacier, Archive tier
├─ Cost: $0.004/GB/month
├─ Retrieval: 4-12 hours (acceptable for rare access)
└─ Use: Legal holds, annual audits
7+ Years Old (deleted)
├─ Reason: Regulatory requirement (GDPR 3-year min)
├─ Method: Secure deletion (NIST 800-88)
└─ Verification: Deletion logs, hash verification
Lifecycle Policies (AWS S3 Example):
{
"Rules": [
{
"Id": "archive-old-data",
"Status": "Enabled",
"Filter": {
"Prefix": "analytics/"
},
"Transitions": [
{
"Days": 90,
"StorageClass": "STANDARD_IA"
},
{
"Days": 180,
"StorageClass": "GLACIER"
},
{
"Days": 365,
"StorageClass": "DEEP_ARCHIVE"
}
],
"Expiration": {
"Days": 2555
}
}
]
}
Retention Policy Example:
Operational Data
├─ Logs: 90 days
├─ Transactions: 7 years (financial regulatory)
└─ User profiles: Until account deletion + 1 year
Personal Data (GDPR Subject)
├─ Active customer: Retained during relationship
├─ Inactive: 1 year after last activity
└─ Deletion request: 30 days maximum
Compliance Data
├─ Audit logs: 5 years
├─ Access records: 3 years
└─ Deletion certificates: Permanent
Data Storage Systems
Relational Databases (OLTP)
Purpose: Transactional systems (production databases).
Characteristics:
- ACID guarantees
- Normalized schemas
- Row-oriented
- Write-optimized
- Low latency (< 100ms)
Examples: PostgreSQL, MySQL, Oracle, SQL Server
Data Warehouses (OLAP)
Purpose: Analytical queries across historical data.
Characteristics:
- Column-oriented storage
- Denormalized schemas (star/snowflake)
- Read-optimized
- Distributed query execution
- Higher latency (1-100s)
Key Innovations:
- Columnar Storage: Compress 10-100x vs. row-based
- Partitioning: Prune irrelevant partitions
- Query Vectorization: Process batches of values in CPU cache
- MPP: Parallelize queries across nodes
Snowflake Architecture:
┌──────────────────────────────────────┐
│ Snowflake Data Warehouse │
├──────────────────────────────────────┤
│ │
│ Compute Layer (Massively Parallel) │
│ ├─ Virtual Warehouses (elastic) │
│ ├─ Query optimization & execution │
│ └─ Auto-scaling │
│ │
│ Storage Layer (Cloud-agnostic) │
│ ├─ Parquet format (compressed) │
│ ├─ Separation of compute & storage │
│ └─ Cross-region replication │
│ │
│ Metadata Layer (Iceberg-like) │
│ ├─ ACID transactions │
│ ├─ Schema versioning │
│ └─ Time travel │
│ │
└──────────────────────────────────────┘
Data Lakes
Purpose: Store all data (raw + processed) cost-effectively.
Characteristics:
- Flexible schema
- Supports structured + unstructured
- Object storage-based (S3, GCS)
- Cheaper than warehouses
- Requires more processing
Modern Lakehouse (Delta Lake, Apache Iceberg):
- ACID transactions on data lake
- Schema enforcement
- Time travel
- DML operations (UPDATE, DELETE)
Example: Delta Lake
from delta.tables import DeltaTable
import pyspark.sql.functions as F
# Create Delta table
spark.read.parquet("s3://data-lake/raw/") \
.write \
.mode("overwrite") \
.format("delta") \
.option("mergeSchema", "true") \
.save("s3://data-lake/orders_delta/")
# ACID update
dt = DeltaTable.forPath(spark, "s3://data-lake/orders_delta/")
dt.update(
condition=F.col("status") == "pending",
set={"status": "processed", "updated_at": F.current_timestamp()}
)
# Time travel (go back 1 hour)
old_data = spark.read \
.format("delta") \
.option("timestampAsOf", "2024-01-15 10:00:00") \
.load("s3://data-lake/orders_delta/")
# View transaction history
spark.sql("""
SELECT * FROM delta.`s3://data-lake/orders_delta/`
VERSION AS OF 5 -- Specific version
""")
Data Processing and Transformation
Processing Paradigms
Batch Processing (Spark)
Process entire datasets at intervals.
Advantages:
- Efficient for large volumes
- Cost-effective (off-peak execution)
- Deterministic, reproducible
Disadvantages:
- Latency: 1+ hour delay
- Inefficient for small changes
- Harder to handle late arrivals
Example: Spark word count
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WordCount").getOrCreate()
# Read text files
text_df = spark.read.text("s3://bucket/documents/*.txt")
# Tokenize and count
word_counts = text_df \
.select(F.explode(F.split(F.col("value"), " ")).alias("word")) \
.filter(F.col("word") != "") \
.groupby("word") \
.count() \
.sort(F.desc("count"))
# Write results
word_counts.write.mode("overwrite").parquet("s3://bucket/output/")
Stream Processing (Kafka Streams, Flink)
Process data continuously as it arrives.
Advantages:
- Real-time results (seconds)
- Handles infinite streams
- Stateful aggregations
Disadvantages:
- Complex semantics (exactly-once, ordering)
- Higher operational overhead
- Debugging harder
Example: Kafka Streams topology
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta
class WindowedAggregator:
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self.windows = defaultdict(lambda: {"count": 0, "sum": 0})
self.consumer = KafkaConsumer(
'input_topic',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='aggregator_group',
)
self.producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
)
def get_window_key(self, timestamp: int) -> int:
"""Assign message to window."""
return (timestamp // self.window_seconds) * self.window_seconds
def process(self):
"""Consume and aggregate messages."""
for message in self.consumer:
data = message.value
window_key = self.get_window_key(data['timestamp'])
# Update window
self.windows[window_key]['count'] += 1
self.windows[window_key]['sum'] += data['value']
# Emit if window complete
if data['timestamp'] > (window_key + self.window_seconds):
window_data = self.windows.pop(window_key)
self.producer.send('output_topic', {
'window_start': window_key,
'count': window_data['count'],
'avg': window_data['sum'] / window_data['count'],
})
aggregator = WindowedAggregator()
aggregator.process()
Data Serving and Consumption Layers (Detailed)
Why Different Teams Need Different Data
The same raw data serves different purposes when viewed through different lenses:
Data Analysts
What they need:
- Aggregated metrics (sums, averages, percentiles)
- Historical trends (day-over-day, year-over-year)
- SQL-friendly schemas
- Business context (names, labels, hierarchies)
Example query:
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(order_amount) as monthly_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
ROUND(SUM(order_amount) / COUNT(DISTINCT customer_id), 2) as avg_customer_value,
(SUM(CASE WHEN is_returning_customer THEN order_amount ELSE 0 END) /
SUM(order_amount)) * 100 as pct_from_returning
FROM analytics.orders
WHERE DATE_TRUNC('month', order_date) >= DATE_SUB(CURRENT_DATE, INTERVAL 12 MONTH)
GROUP BY 1
ORDER BY 1 DESC;
ML Engineers / Data Scientists
What they need:
- Numerical features (ready for model input)
- Consistent feature definitions across train/serve
- Low-latency access (real-time inference)
- Feature versioning
Example features:
# Customer churn prediction model features
features = {
"user_id": 12345,
"days_since_signup": 450,
"total_purchases": 8,
"total_spent": 2499.50,
"avg_order_value": 312.44,
"purchase_frequency_30d": 0.5,
"days_since_last_purchase": 45,
"churn_risk_score": 0.72, # Derived feature
"segment": "high_value",
}
Product & Growth Teams
What they need:
- Behavioral metrics (funnel steps, retention)
- Experiment results (A/B test impacts)
- Real-time dashboards (live user counts)
- User segments
Example metrics:
# Growth funnel metrics (daily)
metrics = {
"date": "2024-01-15",
"sign_ups": 450,
"activated_users": 180, # Completed onboarding
"activation_rate": 0.40,
"day_7_retention": 0.65,
"day_30_retention": 0.40,
"paying_customers": 35,
"ltv": 4500, # Lifetime value
}
Executives / Leadership
What they need:
- High-level KPIs (revenue, growth, margins)
- Trends and alerts
- Comparisons (vs. last year, vs. plan)
- Simple visualizations
Example dashboard:
┌──────────────────────────────────────┐
│ Executive Sales Dashboard │
├──────────────────────────────────────┤
│ │
│ Revenue (MTD) │
│ $4.2M ↑ 12% vs. last month │
│ │
│ Units Sold │
│ 12,500 ↑ 8% vs. last month │
│ │
│ Avg. Contract Value │
│ $12,400 ↓ 2% vs. last month │
│ │
│ Sales Pipeline │
│ $18.5M ↑ 15% vs. last quarter │
│ │
│ Win Rate │
│ 38% (target: 40%) │
│ │
└──────────────────────────────────────┘
Advanced Topics
Real-Time Analytics Architectures
Lambda Architecture
Combine batch + stream for accuracy + timeliness.
Data Source
↙ ↖
Batch Layer Speed Layer
↓ ↓
Batch View Real-time View
↖ ↙
Serving Layer
↓
End User
Advantages:
- Accurate batch + timely stream
- Recoverable from failures
Disadvantages:
- Operational complexity
- Duplicate logic (batch + stream)
- Two views to maintain
Kappa Architecture
Single stream processing pipeline.
Data Source
↓
Stream Processing
↓
Serving Layer
↓
End User
Advantages:
- Simpler (one code path)
- Easier replay (reprocessing old data)
Disadvantages:
- Requires long event retention
- Stream processing complexity
- Harder to handle late arrivals
Data Quality Frameworks
Great Expectations - Automated data validation:
import great_expectations as ge
# Load data
df = ge.read_csv("data.csv")
# Define expectations
df.expect_column_to_exist("user_id")
df.expect_column_values_to_not_be_null("email")
df.expect_column_values_to_match_regex("email", r".*@.*\..*")
df.expect_table_row_count_to_be_between(1000, 100000)
# Run validation
results = df.validate()
print(results) # Pass/fail summary
Data Monetization and Privacy
Data Marketplace:
- Sell anonymized datasets
- API access to premium datasets
- Revenue source for data-heavy orgs
Privacy-Preserving Techniques:
- Differential Privacy: Add noise to preserve individual privacy
- Federated Learning: Train models without centralizing data
- Homomorphic Encryption: Compute on encrypted data
Conclusion
Modern data engineering is a complex orchestration of:
- Collection methods tailored to sources
- Ingestion infrastructure (batch, streaming, API)
- Storage tiers (hot, warm, cold)
- Processing frameworks (Spark, Flink, dbt)
- Serving layers (warehouses, marts, caches)
- Governance and compliance
Each piece requires careful selection based on:
- Latency requirements (seconds vs. hours)
- Throughput (KBps vs. GBps)
- Consistency needs (exactly-once vs. at-least-once)
- Cost constraints
- Team expertise
Understanding the full data lifecycle ;from collection to archival;enables building systems that are reliable, cost-efficient, and serve diverse organizational needs.
Top comments (0)