Event Bridge
How do I route events from a DynamoDB stream to multiple Lambda functions?
1. Intro
Our AI pipeline consists of 10 modules, each dependent on the previous step, so we need to implement event-driven trigger system.
At certain stages, some modules require aggregation-based triggering. To support this, I implemented an event system using:
- Dynamodb stream
- Lambda functions
- to filter events
- check whether it is success or not
- register fallback process
- Step functions
- to handle fallback cases (e.g., cron jobs, check condition, trigger next modules)
This setup has worked well so far. However, as we expand to additional AI modules that also require aggregation-based triggers, the need for scalability and flexibility is becoming more important.
To address this, I'm planning to refactor our legacy event system by introducing EventBridge and updating our Lambda structure.
2. EventBridge
2-1. Support
- Fan out (1:N)
- Filtering
- Routing (
Rules
) - Support multiple target types
2-2. Structure
EventBridge includes two ways to process and deliver events: event buses and pipes.
Bus (N:M)
Routers that receive events and deliveers them to 0 ~ N targets.
- Well suited for routing events (N:M)
- With optional transformation of events prior to delivery
Pipes (1:1)
Intended for point-to-point
Each pipe receives events from a single source.
- 1:1
- Advanced transformation and enrichment of events prior to delivery.
💡 Pipes and event buses are often used together. A common use case is to create a pipe with an event bus as its target; the pipe sends events to the event bus, which then sends those events on to multiple targets.
3. DynamoDB Stream -> EventBridge
- Stream:
DynamoDB Stream
- Pipes: Consumes events from DynamoDB Streams
- Routes them to target
EventBridge bus
- Filter events
- Routes them to target
- Bus: Deliver events to destinations (target)
3.1. Creating an integration
3.1.1. Enabling a dynamodb stream
3.1.2. Create a bus
perception-pipeline-event-bus-prod
perception-pipeline-event-bus-dev
- [x] Enable Schema discovery
When you enable schema discovery the schema, or structure, of all events that get processed by the event bus are stored in the Schema registry. Once they are in the registry, you can download them and use them to speed up development of new features.
3.1.3. Create a pipe
perception-pipeline-pipe-prod
perception-pipeline-pipe-dev
Source
Starting position, choose one of the following:
- Latest – Start reading the stream with the most recent record in the shard.
- Trim horizon – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.
Batch size - optional:
- Enter a maximum number of records for each batch. The default value is 100.
Filtering
: Filtering for only INSERT events with status "COMPLETED" from the DynamoDB stream (according to my schema).
{
"eventName": ["INSERT"],
"dynamodb": {
"NewImage": {
"status": {
"S": ["COMPLETED"]
}
}
}
}
sample dynamodb event
{
"eventID": "a9b64ee3d998207db27d21c60505b64d",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "...ap",
"dynamodb": {
"ApproximateCreationDateTime": 1752538268,
"Keys": {
"date#event_type": {
"S": "2025-07-15#pose#sync001#cam3"
},
"site": {
"S": "haemoro_1x2"
}
},
"NewImage": {
"date#event_type": {
"S": "2025-07-15#pose#sync001#cam3"
},
"site": {
"S": "haemoro_1x2"
},
"expires_at": {
"N": "1752624667"
},
"status": {
"S": "COMPLETED"
}
},
"SequenceNumber": "2198821900002985607563155842",
"SizeBytes": 147,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "..."
}
Enrichment
:
- Create a lambda
"""
This Lambda function is used in the EventBridge enrichment step.
It is shared by both the dev and prod environments.
"""
SORT_KEY = "date#event_type"
TARGET_KEY = "PerceptionPipelineEventType"
def lambda_handler(event, context):
"""
Example sort_key values:
- 2024-10-28#pose#sync001#cam7
- 2024-10-28#recon#sync007
- 2024-10-28#preprocessing
- 2024-10-28#reid
- 2024-10-28#postprocessing
...
"""
sk = event["dynamodb"]["NewImage"][SORT_KEY]["S"]
event_type = "unknown"
match sk:
case s if "#pose#" in s:
event_type = "pose"
case s if "#recon#" in s:
event_type = "recon"
case s if "#preprocessing" in s:
event_type = "preprocessing"
case s if "#reid" in s:
event_type = "reid"
case s if "#postprocessing" in s:
event_type = "postprocessing"
case _:
event_type = "unknown"
event[TARGET_KEY] = event_type
return event
- Set Enrichment (
Transformer
)
{
"dynamodb": {
"NewImage": <$.dynamodb.NewImage>
}
}
Target
-
perception-pipeline-pipe-prod
->perception-pipeline-event-bus-prod
-
perception-pipeline-pipe-dev
->perception-pipeline-event-bus-dev
3.1.4. Role & Pipe setting
- Dyanamodb Stream
- SQS
- Lambda
- EventBridge
3.1.5. Update Lambda
- pose event -> aggregate_pose-dev (lambda) -> trigger_recon
- recon event -> aggregate_recon-dev -> trigger_preprocessing / trigger_reid ...
3.1.6. Create a Rule
Generate Event Rule
aggregate-pose-rule-dev
aggregate-recon-rule-dev
aggregate-pose-rule-prod
aggregate-recon-rule-prod
Build Event pattern
I've already filtered the DynamoDB stream using a pipe
to allow only:
- "INSERT" events
- Events where status ==
COMPLETED
And I've already enriched event with "PerceptionPipelineEventType" field
So in this event bus rule, I just need to route:
- PerceptionPipelineEventType == "pose" -> to the
aggregate_pose
Lambda - PerceptionPipelineEventType == "recon" -> to the
aggregate_recon
Lambda
sample event
{
"version": "0",
"id": "52461f27-a0bd-8365-8d47-866f429e6f57",
"detail-type": "AWS API Call via CloudTrail",
"source": "aws.lambda",
"account": "123456789012",
"time": "2024-08-21T09:11:10Z",
"region": "ca-central-1",
"resources": [],
"detail": {
"eventID": "a9b64ee3d998207db27d21c60505b64d",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "...ap",
"dynamodb": {
"ApproximateCreationDateTime": 1752538268,
"Keys": {
"date#event_type": {
"S": "2025-07-15#pose#sync001#cam3"
},
"site": {
"S": "haemoro_1x2"
}
},
"NewImage": {
"date#event_type": {
"S": "2025-07-15#pose#sync001#cam3"
},
"site": {
"S": "haemoro_1x2"
},
"expires_at": {
"N": "1752624667"
},
"status": {
"S": "COMPLETED"
}
},
"SequenceNumber": "2198821900002985607563155842",
"SizeBytes": 147,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "...",
"PerceptionPipelineEventType": "pose" 🔥 e.g., pose, recon, ...
}
}
- for aggregate-pose
{
"detail": {
"PerceptionPipelineEventType": ["pose"]
}
}
- for aggregate-recon
{
"detail": {
"PerceptionPipelineEventType": ["recon"]
}
}
Verify
3.1.7. E2E Test
- Trigger dynamodb
from enum import Enum
import boto3
from datetime import datetime, timedelta
event_table = boto3.resource("dynamodb", region_name="ap-northeast-2").Table(
"PerceptionPipelineEvents-dev"
)
EXPIRE_TIME_DELTA = timedelta(days=1)
class EventStatus(str, Enum):
COMPLETED = "COMPLETED"
PROCESSING = "PROCESSING"
class PerceptionEventTable:
partition_key = "site"
sort_key = "date#event_type"
@staticmethod
def save_event(pk: str, sort_key: str, status: EventStatus):
def get_ttl():
return int((datetime.now() + EXPIRE_TIME_DELTA).timestamp())
resp = event_table.put_item(
Item={
"site": pk,
"date#event_type": sort_key,
"status": status.value,
"expires_at": get_ttl(),
}
)
return resp
if __name__ == "__main__":
site = "test_site"
pose_sort_key = f"2025-07-15#pose#sync001#cam6"
recon_sort_key = f"2025-07-15#recon#sync001"
PerceptionEventTable.save_event(
pk=site,
sort_key=pose_sort_key,
status=EventStatus.COMPLETED,
)
PerceptionEventTable.save_event(
pk=site,
sort_key=recon_sort_key,
status=EventStatus.COMPLETED,
)
Top comments (0)