When I work on analytics pipelines for event-driven systems, one of the biggest mistakes I see is treating ingestion as “just connect source A to sink B.”
In production, ingestion is where a lot of the hard engineering lives:
- deciding which transport is actually right (EventBridge vs Kinesis vs SQS)
- handling ordering, duplication, and replay
- transforming events into a canonical analytics schema
- delivering to multiple sinks like S3, OpenSearch, and Redshift
- keeping the design cost-efficient as volume grows
This is why I like this topic. It is architecture-heavy, it shows real trade-offs, and it comes up constantly in real workloads.
In this post, I will walk through a practical pattern for serverless CDC/event ingestion into analytics pipelines on AWS, including:
- EventBridge vs Kinesis vs SQS decisioning
- Lambda transformations (normalization, enrichment, routing)
- delivery patterns to S3 / OpenSearch / Redshift
- handling ordering, duplication, and replay
- partitioning and cost optimization
- an end-to-end walkthrough and implementation discussion with code
I will focus on patterns that are accurate, scalable, and maintainable rather than “one service solves everything.”
The design principle I start with
I design ingestion in layers:
- Ingress transport for delivery semantics (routing, throughput, ordering, buffering)
- Transformation layer for canonicalization and enrichment
- Durable landing zone (usually S3 first)
- Serving/analytics sinks (OpenSearch, Redshift, dashboards, ML features, etc.)
- Replay and recovery path as a first-class capability
That structure helps me evolve the system without constantly rewriting downstream consumers.
EventBridge vs Kinesis vs SQS decisioning
This is the first architectural decision, and it shapes everything else.
The short version is:
- EventBridge is great for event routing and integration
- Kinesis Data Streams is great for high-throughput ordered streaming plus replay
- SQS is great for buffering and decoupled async processing
I do not treat them as mutually exclusive. In many production designs, I use two or even all three, each for what it is best at.
Quick decision guide
Use Amazon EventBridge when I need
- event routing between services and teams
- content-based filtering and fan-out
- SaaS integrations and AWS service events
- schema governance and event contracts
- archive/replay on the event bus (for supported replay workflows)
- lower-to-moderate throughput domain events where strict ordering is not required
Use Amazon Kinesis Data Streams when I need
- high-throughput event or CDC ingestion
- ordering per partition key
- multiple independent consumers at stream scale
- explicit replay from stream retention
- near-real-time analytics pipelines with controlled parallelism
Use Amazon SQS when I need
- durable buffering and backpressure absorption
- decoupling between producers and consumers
- cheap asynchronous processing
- retry isolation and DLQ handling
- workload smoothing (especially spiky ingest)
My common production pattern
I often use:
- EventBridge for domain routing
- Kinesis for analytics ingestion backbone
- SQS for retry/backpressure side paths
That gives me clean producer contracts and strong ingestion behavior.
What each service is not
I find it useful to say this explicitly during architecture reviews.
EventBridge is not a high-throughput ordered stream
It is excellent for routing, but it does not give me shard-style ordering or stream-style replay semantics like Kinesis retention.
Kinesis is not a drop-in replacement for event bus routing
It gives throughput and ordering, but not the same out-of-the-box event routing and filtering ergonomics as EventBridge.
SQS is not an analytics event backbone by itself
It is amazing for buffering, but replay, retention, and consumer fan-out semantics are different from Kinesis, and standard queues do not preserve ordering.
Reference architecture at a glance
For this post, I will use a practical hybrid pattern that I use often for analytics ingestion:
- application and domain events are published to EventBridge
- CDC or high-volume events go to Kinesis Data Streams (directly or via a CDC bridge)
- Lambda transformer normalizes records into a canonical analytics schema
- canonical events are delivered to:
- S3 (primary durable analytics landing zone, partitioned)
- OpenSearch (near-real-time search and observability use cases)
- Redshift Serverless (warehouse analytics, usually S3-first load pattern)
- SQS is used for retry isolation and backpressure for sink-specific processors
Mermaid diagram (reference architecture)
End-to-end walkthrough (what I will build conceptually)
To make this concrete, I will walk through an example using an e-commerce platform:
- domain events like
OrderPlacedandOrderShippedare published on EventBridge - high-volume change events (for example inventory or order status updates) are ingested via Kinesis
- a Lambda transformer converts everything into a canonical analytics event
- events land in S3 as compressed JSON (or Parquet via Firehose conversion)
- selected events are indexed into OpenSearch
- Redshift loads from S3 for warehouse analytics
Why I like this pattern
It lets me separate:
- operational event routing (EventBridge)
- analytics ingestion behavior (Kinesis)
- durable storage and replay (S3 + retention)
- sink-specific delivery (OpenSearch, Redshift)
This gives me a pipeline that is easier to evolve as analytics use cases grow.
Canonical event schema (the contract that keeps the pipeline sane)
Before I write any code, I define a canonical schema. This is one of the highest-leverage things I do in analytics ingestion.
I do not want every downstream consumer decoding a different source format.
Example canonical schema
{
"event_id": "evt_01HXYZ...",
"event_type": "order.placed",
"event_version": 1,
"source": "commerce.orders",
"tenant_id": "tenant_123",
"entity_type": "order",
"entity_id": "ord_987",
"occurred_at": "2026-02-25T10:15:30Z",
"ingested_at": "2026-02-25T10:15:31Z",
"trace_id": "trace-abc",
"idempotency_key": "order.placed:tenant_123:ord_987:v1",
"sequence_key": "tenant_123#ord_987",
"payload": {
"customer_id": "cus_1",
"currency": "USD",
"amount": 149.90
},
"meta": {
"transport": "eventbridge",
"schema_name": "order-events",
"schema_version": "1.4.0"
}
}
Fields I specifically care about
-
event_id: unique event identity for dedupe and tracing -
occurred_at: source event time (for analytics) -
ingested_at: pipeline time (for operations) -
sequence_key: ordering scope (important for Kinesis partitioning and reasoning) -
idempotency_key: sink-safe dedupe key when replaying or retrying -
event_version: schema evolution support
If I skip this step, the pipeline quickly becomes fragile.
Reference implementation pattern (AWS services)
For this walkthrough, the main flow is:
- EventBridge receives domain events
- EventBridge rule forwards analytics-relevant events to Kinesis Data Streams
- High-volume event or CDC sources also publish to Kinesis Data Streams
- Lambda transformer consumes Kinesis batches
- Lambda normalizes and enriches records and writes:
- primary path to Firehose -> S3
- selective path to SQS for OpenSearch indexing
- Redshift Serverless loads from S3 (COPY and MERGE pattern)
- Replay and backfill can occur from Kinesis retention or S3 reprocessing
This keeps the ingestion backbone consistent while allowing different producers.
Infrastructure example (SAM / CloudFormation snippets)
The snippet below shows a minimal but realistic foundation:
- Kinesis Data Stream
- Lambda transformer
- Firehose delivery stream to S3
- SQS queue for indexing
- EventBridge rule that forwards selected events to Kinesis
This is intentionally a reference snippet (not a full production template) so the article stays readable.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless CDC/Event ingestion to analytics pipeline
Resources:
AnalyticsEventsStream:
Type: AWS::Kinesis::Stream
Properties:
StreamModeDetails:
StreamMode: ON_DEMAND
RetentionPeriodHours: 48
RawAnalyticsBucket:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
AnalyticsIndexQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 120
RedrivePolicy:
deadLetterTargetArn: !GetAtt AnalyticsIndexDLQ.Arn
maxReceiveCount: 5
AnalyticsIndexDLQ:
Type: AWS::SQS::Queue
FirehoseToS3Role:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: FirehoseS3Write
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt RawAnalyticsBucket.Arn
- !Sub "${RawAnalyticsBucket.Arn}/*"
AnalyticsFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt RawAnalyticsBucket.Arn
RoleARN: !GetAtt FirehoseToS3Role.Arn
Prefix: "dataset=events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
ErrorOutputPrefix: "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
CompressionFormat: GZIP
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 16
AnalyticsTransformerFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: python3.12
Handler: app.lambda_handler
CodeUri: src/
MemorySize: 1024
Timeout: 120
Environment:
Variables:
FIREHOSE_STREAM_NAME: !Ref AnalyticsFirehose
INDEX_QUEUE_URL: !Ref AnalyticsIndexQueue
Policies:
- Statement:
- Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt AnalyticsFirehose.Arn
- Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt AnalyticsIndexQueue.Arn
Events:
KinesisIngest:
Type: Kinesis
Properties:
Stream: !GetAtt AnalyticsEventsStream.Arn
StartingPosition: LATEST
BatchSize: 500
MaximumBatchingWindowInSeconds: 5
FunctionResponseTypes:
- ReportBatchItemFailures
EventBridgeToKinesisRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: PutToKinesis
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: kinesis:PutRecord
Resource: !GetAtt AnalyticsEventsStream.Arn
AnalyticsEventRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- commerce.orders
- commerce.inventory
Targets:
- Arn: !GetAtt AnalyticsEventsStream.Arn
Id: KinesisAnalyticsTarget
RoleArn: !GetAtt EventBridgeToKinesisRole.Arn
KinesisParameters:
PartitionKeyPath: "$.detail.orderId"
Why this foundation works
- Kinesis on-demand is great while volume is still changing
- Firehose to S3 gives durable landing, buffering, and compression
- Lambda centralizes canonicalization and routing
- SQS isolates OpenSearch indexing retries from the main ingest path
- EventBridge feeds analytics without forcing every producer to know about Kinesis directly
Lambda transformation layer (the part that pays for itself)
This is the heart of the pattern.
I use the Lambda transformation layer to:
- normalize different event formats into one canonical schema
- enrich records (tenant, derived dimensions, lookup joins if lightweight)
- attach dedupe metadata and ordering keys
- route records to the right sinks
- drop or quarantine malformed records
Rules I follow for transformations
- Keep it deterministic (same input -> same normalized output)
- Keep it fast (avoid heavy network calls in the hot path)
- Keep it observable (emit counts by event type and error reason)
- Fail records, not whole batches when possible
- Preserve original payload if analytics or debugging needs it
Example Lambda transformer (Kinesis -> Firehose + SQS)
This example:
- reads a Kinesis batch
- normalizes records from multiple sources (EventBridge-shaped or direct JSON)
- writes canonical events to Firehose (S3 path)
- sends selected event types to SQS for OpenSearch indexing
- returns partial batch failures for retriable records
import base64
import json
import os
import time
import hashlib
from datetime import datetime, timezone
from typing import Any, Dict, List
import boto3
firehose = boto3.client("firehose")
sqs = boto3.client("sqs")
FIREHOSE_STREAM_NAME = os.environ["FIREHOSE_STREAM_NAME"]
INDEX_QUEUE_URL = os.environ["INDEX_QUEUE_URL"]
INDEXABLE_EVENT_TYPES = {
"order.placed",
"order.shipped",
"product.updated",
}
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def sha256_text(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def normalize_event(source_obj: Dict[str, Any]) -> Dict[str, Any]:
ingested_at = utc_now_iso()
if "detail-type" in source_obj and "detail" in source_obj:
detail = source_obj.get("detail") or {}
event_type = detail.get("eventType") or source_obj["detail-type"].replace(" ", ".").lower()
entity_id = detail.get("orderId") or detail.get("productId") or detail.get("entityId") or "unknown"
tenant_id = detail.get("tenantId", "default")
occurred_at = source_obj.get("time") or ingested_at
event_id = detail.get("eventId") or source_obj.get("id") or sha256_text(json.dumps(source_obj, sort_keys=True))
payload = detail
source_name = source_obj.get("source", "unknown")
else:
event_type = source_obj.get("event_type", "unknown")
entity_id = source_obj.get("entity_id") or source_obj.get("pk") or "unknown"
tenant_id = source_obj.get("tenant_id", "default")
occurred_at = source_obj.get("occurred_at") or source_obj.get("timestamp") or ingested_at
event_id = source_obj.get("event_id") or sha256_text(json.dumps(source_obj, sort_keys=True))
payload = source_obj.get("payload", source_obj)
source_name = source_obj.get("source", "direct-producer")
event_version = int(source_obj.get("event_version", 1)) if isinstance(source_obj, dict) else 1
entity_type = payload.get("entityType") or ("order" if "order" in event_type else "unknown")
sequence_key = f"{tenant_id}#{entity_type}#{entity_id}"
idempotency_key = f"{event_type}:{tenant_id}:{entity_id}:v{event_version}"
return {
"event_id": str(event_id),
"event_type": str(event_type),
"event_version": event_version,
"source": str(source_name),
"tenant_id": str(tenant_id),
"entity_type": str(entity_type),
"entity_id": str(entity_id),
"occurred_at": str(occurred_at),
"ingested_at": ingested_at,
"trace_id": str((payload.get("traceId") or source_obj.get("trace_id") or "")),
"idempotency_key": idempotency_key,
"sequence_key": sequence_key,
"payload": payload,
"meta": {
"transport": "kinesis",
"normalized_by": "analytics-transformer-lambda",
"schema_name": "canonical-analytics-event",
"schema_version": "1.0.0",
},
}
def parse_kinesis_record(record: Dict[str, Any]) -> Dict[str, Any]:
raw_bytes = base64.b64decode(record["kinesis"]["data"])
return json.loads(raw_bytes.decode("utf-8"))
def chunked(items: List, size: int):
for i in range(0, len(items), size):
yield items[i:i + size]
def send_to_firehose(events: List[Dict[str, Any]]) -> None:
records = [{"Data": (json.dumps(evt, separators=(",", ":")) + "\n").encode("utf-8")} for evt in events]
for batch in chunked(records, 500):
resp = firehose.put_record_batch(DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=batch)
if resp.get("FailedPutCount", 0):
raise RuntimeError(f"Firehose batch write had {resp['FailedPutCount']} failed records")
def send_index_jobs(events: List[Dict[str, Any]]) -> None:
if not events:
return
for batch in chunked(events, 10):
entries = []
for idx, evt in enumerate(batch):
entries.append({
"Id": str(idx),
"MessageBody": json.dumps({
"event_id": evt["event_id"],
"event_type": evt["event_type"],
"tenant_id": evt["tenant_id"],
"entity_id": evt["entity_id"],
"occurred_at": evt["occurred_at"],
"payload": evt["payload"],
"idempotency_key": evt["idempotency_key"],
})
})
sqs.send_message_batch(QueueUrl=INDEX_QUEUE_URL, Entries=entries)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
transformed = []
index_jobs = []
batch_item_failures = []
start_ms = int(time.time() * 1000)
for record in event.get("Records", []):
sequence_number = record["kinesis"]["sequenceNumber"]
try:
src = parse_kinesis_record(record)
canonical = normalize_event(src)
transformed.append(canonical)
if canonical["event_type"] in INDEXABLE_EVENT_TYPES:
index_jobs.append(canonical)
except Exception as e:
batch_item_failures.append({"itemIdentifier": sequence_number})
print(json.dumps({
"level": "ERROR",
"message": "Failed to parse/normalize record",
"sequence_number": sequence_number,
"error": str(e),
}))
if transformed:
try:
send_to_firehose(transformed)
except Exception as e:
print(json.dumps({"level": "ERROR", "message": "Firehose write failed", "error": str(e)}))
for record in event.get("Records", []):
seq = record["kinesis"]["sequenceNumber"]
if {"itemIdentifier": seq} not in batch_item_failures:
batch_item_failures.append({"itemIdentifier": seq})
return {"batchItemFailures": batch_item_failures}
if index_jobs:
try:
send_index_jobs(index_jobs)
except Exception as e:
# Often I do not fail primary ingest if indexing queue write fails.
print(json.dumps({"level": "ERROR", "message": "Index queue write failed", "error": str(e)}))
duration_ms = int(time.time() * 1000) - start_ms
print(json.dumps({
"level": "INFO",
"message": "Batch processed",
"records_in": len(event.get("Records", [])),
"records_transformed": len(transformed),
"records_indexed": len(index_jobs),
"records_failed": len(batch_item_failures),
"duration_ms": duration_ms,
}))
return {"batchItemFailures": batch_item_failures}
Why this implementation pattern works
- I treat S3 landing as the primary success path
- I isolate OpenSearch indexing via SQS
- I use partial batch failure for source retries
- I preserve enough metadata (
event_id,idempotency_key,sequence_key) for dedupe and replay
OpenSearch delivery pattern (what I do in practice)
For OpenSearch, I do not assume the ingest path and indexing path should share the same retry semantics.
That is why I often decouple indexing with SQS.
Why SQS in front of OpenSearch helps
- OpenSearch can throttle under load
- index mapping errors or payload issues should not block S3 landing
- I can tune retry behavior independently
- I can replay index jobs from S3 if needed
Simple SQS -> Lambda -> OpenSearch indexer (illustrative snippet)
import json
import os
from typing import Dict, Any, List
from opensearchpy import OpenSearch, RequestsHttpConnection, helpers
OPENSEARCH_HOST = os.environ["OPENSEARCH_HOST"]
INDEX_NAME = os.environ.get("OPENSEARCH_INDEX", "analytics-events")
client = OpenSearch(
hosts=[{"host": OPENSEARCH_HOST, "port": 443}],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
)
def to_index_doc(evt: Dict[str, Any]) -> Dict[str, Any]:
return {
"event_id": evt["event_id"],
"event_type": evt["event_type"],
"tenant_id": evt["tenant_id"],
"entity_id": evt["entity_id"],
"occurred_at": evt["occurred_at"],
"idempotency_key": evt["idempotency_key"],
"payload": evt["payload"],
}
def lambda_handler(event, context):
actions: List[Dict[str, Any]] = []
for record in event["Records"]:
evt = json.loads(record["body"])
actions.append({
"_op_type": "index",
"_index": INDEX_NAME,
"_id": evt["event_id"],
"_source": to_index_doc(evt),
})
if actions:
helpers.bulk(client, actions, request_timeout=30)
return {"ok": True, "indexed": len(actions)}
Best practice note
Using _id = event_id gives me idempotent-friendly indexing behavior (retries overwrite the same document rather than creating duplicates). That is usually what I want for analytics and search event documents.
Delivery to Redshift (S3-first is the pattern I recommend most)
For analytics warehouses, I usually prefer S3-first ingestion rather than writing directly to Redshift from the transformation Lambda.
Why:
- S3 is a durable landing zone for replay and audit
- Redshift loads can be batched efficiently
- I can rebuild tables from historical data
- I keep ingestion and warehouse modeling decoupled
Common pattern
- Land canonical events in S3
- Load into a staging table in Redshift
-
MERGEinto analytics tables (or fact tables) - Keep a watermark or batch manifest for operations
Example Redshift SQL (staging + merge)
CREATE TABLE IF NOT EXISTS staging_events (
event_id VARCHAR(128),
event_type VARCHAR(128),
event_version INT,
source VARCHAR(256),
tenant_id VARCHAR(128),
entity_type VARCHAR(128),
entity_id VARCHAR(128),
occurred_at TIMESTAMP,
ingested_at TIMESTAMP,
trace_id VARCHAR(256),
idempotency_key VARCHAR(256),
sequence_key VARCHAR(256),
payload SUPER
);
COPY staging_events
FROM 's3://your-bucket/dataset=events/year=2026/month=02/day=25/'
IAM_ROLE 'arn:aws:iam::<account-id>:role/RedshiftCopyRole'
FORMAT AS JSON 'auto'
TIMEFORMAT 'auto'
GZIP;
CREATE TABLE IF NOT EXISTS fact_order_events (
event_id VARCHAR(128) PRIMARY KEY,
tenant_id VARCHAR(128),
order_id VARCHAR(128),
event_type VARCHAR(128),
occurred_at TIMESTAMP,
ingested_at TIMESTAMP,
amount DECIMAL(18,2),
currency VARCHAR(16),
payload SUPER
);
MERGE INTO fact_order_events AS tgt
USING (
SELECT
event_id,
tenant_id,
entity_id AS order_id,
event_type,
occurred_at,
ingested_at,
TRY_CAST(payload.amount AS DECIMAL(18,2)) AS amount,
CAST(payload.currency AS VARCHAR(16)) AS currency,
payload
FROM staging_events
WHERE entity_type = 'order'
) AS src
ON tgt.event_id = src.event_id
WHEN MATCHED THEN UPDATE SET
tenant_id = src.tenant_id,
order_id = src.order_id,
event_type = src.event_type,
occurred_at = src.occurred_at,
ingested_at = src.ingested_at,
amount = src.amount,
currency = src.currency,
payload = src.payload
WHEN NOT MATCHED THEN INSERT (
event_id, tenant_id, order_id, event_type, occurred_at, ingested_at, amount, currency, payload
) VALUES (
src.event_id, src.tenant_id, src.order_id, src.event_type, src.occurred_at, src.ingested_at, src.amount, src.currency, src.payload
);
TRUNCATE TABLE staging_events;
Why MERGE is important in CDC and event pipelines
Retries and replays happen. MERGE lets me keep warehouse loads idempotent at the table level rather than assuming every batch is perfectly unique.
Ordering, duplication, and replay (the part that breaks naive designs)
This is where I spend a lot of time in reviews because it directly affects data correctness.
Ordering: what I can and cannot guarantee
Kinesis ordering
Kinesis preserves record order within a shard, and practically I think in terms of ordering per partition key.
If I need ordering for orderId, I choose a partition key tied to that ordering scope (for example tenantId#orderId).
EventBridge ordering
I do not assume EventBridge preserves strict ordering across events. If ordering matters for analytics correctness, I enforce it downstream with event timestamps, versions, and conflict resolution logic.
SQS ordering
- Standard queues: no strict ordering, duplicates possible
- FIFO queues: ordered per MessageGroupId, with a bounded dedupe window
My rule of thumb
I preserve ordering only where it matters:
- choose a sequence key
- partition based on that key when using Kinesis or FIFO
- store
event_versionandoccurred_at - make downstream upserts resilient to out-of-order arrivals
Trying to preserve global ordering everywhere usually makes the system slower and more expensive than it needs to be.
Duplication: assume it will happen
I assume duplicates can appear because of:
- producer retries
- Lambda retries and partial batch retries
- EventBridge target retries
- SQS redrives
- replay and backfill operations
- manual reprocessing
How I handle duplicates
I include in the canonical record:
event_ididempotency_key
Then I make sinks safe:
- S3: duplicates can exist physically, but I dedupe downstream in queries or ETL
- OpenSearch: use
_id = event_idto overwrite same document on retry - Redshift:
MERGEonevent_idor business key
This is the exactly-once myth versus at-least-once reality pattern applied to analytics ingestion.
Replay: make it a feature, not an emergency procedure
I design replay paths intentionally from day one.
Replay options in this architecture
- Kinesis retention replay (re-read retained stream window)
- EventBridge archive and replay (for applicable event bus scenarios)
- S3 reprocessing (most flexible for historical rebuilds and backfills)
Why S3 replay matters most
Even if I have Kinesis or EventBridge replay features, S3 is usually my best long-term replay layer because:
- it keeps historical data longer
- it is cheap and durable
- I can reprocess with new transformation logic
- I can rebuild OpenSearch or Redshift if needed
That is why I strongly prefer S3-first landing for analytics pipelines.
Partitioning and cost optimization (where big savings come from)
This topic matters a lot because ingestion costs scale with volume, and bad partitioning creates pain in both storage and query engines.
S3 partitioning strategy (practical guidance)
A common anti-pattern is over-partitioning too early.
What I usually start with
For general event analytics, I start with time-based partitions:
dataset=events/year=YYYY/month=MM/day=DD/
Then, if query patterns justify it, I add one more selective dimension:
tenant_id=...event_type=...
What I avoid early on
- highly granular partitions that create too many small files
- partitioning on high-cardinality IDs like
order_idoruser_id - changing partition schemes frequently without a migration plan
Small files are a real cost problem
Too many tiny files hurt:
- Athena or Redshift query planning and performance
- metadata overhead
- downstream ETL efficiency
That is why I use batching and buffering (Firehose or app-side batching) and aim for healthy object sizes.
Firehose and file size optimization
Firehose helps reduce operational overhead, and I use it a lot for S3 landing.
Best practices I apply
- enable compression (GZIP minimum; Parquet or ORC conversion when appropriate)
- tune buffer interval and size
- use error output prefixes for bad records
- keep schemas stable enough if using format conversion
When I choose Parquet conversion
I choose Parquet when:
- analytics queries dominate
- schema is reasonably stable
- I want lower scan cost and faster query performance
I keep JSON initially if:
- schema changes rapidly
- debugging raw payloads is a priority
- multiple downstream consumers still need semi-structured payloads
A common compromise is:
- raw JSON landing plus curated Parquet later
Kinesis cost and throughput optimization
Kinesis can be very cost-effective when used intentionally, but I still tune it.
Decisions I make explicitly
- On-demand vs provisioned
- start with on-demand for uncertain traffic
- move to provisioned when traffic is predictable and steady
- partition key distribution to avoid hot shards
- batch sizes and Lambda windowing to reduce invocation overhead
- consumer count (Enhanced Fan-Out only when justified)
Hot shard warning sign
If one key dominates (for example a single tenant or entity), I can get uneven throughput and throttling.
Fixes include:
- better partition key strategy
- partition key suffixing (only if ordering requirements allow)
- separating noisy tenants or workloads
Lambda cost optimization in ingest pipelines
Lambda is often not the dominant cost at first, but it can become noticeable at scale.
Tuning areas I care about
- batch size and batching window
- memory sizing (to get better CPU and network, and shorter runtime)
- avoiding heavy per-record network calls
- reusing clients across invocations
- minimizing unnecessary JSON serialization churn
A practical optimization
I treat the transformer as a batch processor, not a record-at-a-time handler. That usually improves both throughput and cost.
Redshift cost optimization in event ingestion
When Redshift is the warehouse sink, I optimize the load pattern, not just the compute.
Best practices I use
- load from S3 in batches (COPY), not row-by-row inserts from Lambda
- stage then MERGE
- align file sizes to efficient COPY behavior
- keep raw event retention in S3 so warehouse tables can be rebuilt and re-modeled
For many teams, the biggest cost win is simply moving from ad hoc inserts to an S3 batch load pattern.
End-to-end implementation discussion (how I wire this in production)
This is the part I care about most because architecture decisions show up in operations.
1) I define the source of truth for ingestion success
In this design, the source of truth is:
Successful normalized delivery to S3 (via Firehose).
Why:
- S3 is durable
- S3 supports replay
- downstream sinks can catch up independently
This prevents me from coupling ingestion success to OpenSearch availability, for example.
2) I decouple sink-specific SLAs
Different sinks serve different users:
- OpenSearch may need near-real-time indexing for search or ops views
- Redshift loads may run in micro-batches
- lake consumers may process hourly
By decoupling them, I avoid making the entire pipeline as fragile as the most sensitive sink.
3) I make replay and backfill a documented operation
I document:
- replay source (Kinesis, EventBridge archive, or S3)
- dedupe keys and merge behavior
- expected lag and throughput limits
- how to avoid double-indexing side effects
This turns replay into an operational capability instead of a risky one-off script.
4) I design for schema evolution early
Events change. They always do.
I version:
- event schema (
event_version) - transformation logic (deployable version)
- warehouse model migrations
I also preserve raw payloads so I can re-derive curated data if the schema evolves.
Common mistakes I see (and how I avoid them)
Mistake 1: Using only EventBridge for everything and expecting stream semantics
EventBridge is excellent for routing, but it is not the same as Kinesis when I need sustained high-throughput ordered ingestion.
Fix: use EventBridge for routing, Kinesis for the analytics ingestion backbone when needed.
Mistake 2: Letting sink failures block primary landing
If OpenSearch throttles and that blocks the whole ingest path, the pipeline becomes fragile.
Fix: make S3 landing primary, and decouple secondary sinks with SQS or replay.
Mistake 3: No canonical schema
Every producer emits a different shape, and downstream SQL gets messy fast.
Fix: normalize once in Lambda and publish a canonical analytics contract.
Mistake 4: Ignoring duplication until dashboards look wrong
Retries, redrives, and replay all create duplicates eventually.
Fix: include event_id and dedupe keys, and make each sink idempotent.
Mistake 5: Over-partitioning S3 on day one
This creates small files, metadata overhead, and poor performance.
Fix: start with time partitions and compression, then add dimensions based on real query patterns.
Practical best practices checklist
Transport decisioning
- [ ] EventBridge used for routing and integration use cases
- [ ] Kinesis used where throughput, order, and replay requirements justify it
- [ ] SQS used for buffering and retry isolation where needed
Transformation layer
- [ ] Canonical schema defined and versioned
- [ ] Transformer is deterministic and observable
- [ ] Partial batch failure behavior is configured for stream or queue consumers
- [ ] Original payload preserved when needed for replay and debugging
Sink delivery
- [ ] S3 is durable landing zone (preferred for analytics)
- [ ] OpenSearch indexing path is decoupled from primary ingest
- [ ] Redshift loads are batch-based (COPY and MERGE), not row-by-row Lambda inserts
- [ ] Dedupe and idempotency strategy exists per sink
Ordering / duplication / replay
- [ ] Partition keys align to ordering scope
- [ ] Duplicate handling defined across retries and replays
- [ ] Replay and backfill path documented and tested
- [ ] Metrics and alarms exist for lag, failure, and sink throttling
Cost / performance
- [ ] S3 compression enabled
- [ ] Partitioning strategy avoids small-file explosion
- [ ] Kinesis mode (on-demand or provisioned) chosen intentionally
- [ ] Lambda batching and memory tuned with real metrics
Final thoughts
If I had to summarize this architecture pattern in one line, it would be:
Use the right service for the right ingestion job, normalize once, land durably in S3, and make every downstream sink replay-safe.
That combination gives me:
- cleaner producer integrations
- better analytics correctness
- safer reprocessing
- more predictable scaling and cost
For most teams, the biggest improvement is not a new service. It is adopting a clearer ingestion architecture with explicit semantics for ordering, duplication, replay, and sink ownership.
If you are building serverless analytics pipelines on AWS, this pattern will give you a strong foundation that can grow with both event volume and analytics complexity.
References
- Amazon EventBridge documentation (event buses, rules, targets, archive/replay)
- Amazon Kinesis Data Streams documentation (stream modes, ordering, retention, consumers)
- Amazon SQS documentation (standard vs FIFO, retries, DLQs)
- AWS Lambda documentation (event source mappings, partial batch response)
- Amazon Kinesis Data Firehose documentation (S3/OpenSearch/Redshift delivery, buffering, compression)
- Amazon S3 documentation (partitioning and storage best practices)
- Amazon OpenSearch Service documentation
- Amazon Redshift and Redshift Serverless documentation (COPY, MERGE, SUPER)

Top comments (0)