DEV Community

Cover image for AWS::EventBridge
Minwook Je
Minwook Je

Posted on

AWS::EventBridge

Event Bridge

How do I route events from a DynamoDB stream to multiple Lambda functions?

1. Intro

Our AI pipeline consists of 10 modules, each dependent on the previous step, so we need to implement event-driven trigger system.

At certain stages, some modules require aggregation-based triggering. To support this, I implemented an event system using:

  • Dynamodb stream
  • Lambda functions
    • to filter events
    • check whether it is success or not
    • register fallback process
  • Step functions
    • to handle fallback cases (e.g., cron jobs, check condition, trigger next modules)

This setup has worked well so far. However, as we expand to additional AI modules that also require aggregation-based triggers, the need for scalability and flexibility is becoming more important.

To address this, I'm planning to refactor our legacy event system by introducing EventBridge and updating our Lambda structure.


2. EventBridge

2-1. Support

2-2. Structure

EventBridge includes two ways to process and deliver events: event buses and pipes.

Bus (N:M)

Routers that receive events and deliveers them to 0 ~ N targets.

  • Well suited for routing events (N:M)
  • With optional transformation of events prior to delivery

Pipes (1:1)

Intended for point-to-point

Each pipe receives events from a single source.

  • 1:1
  • Advanced transformation and enrichment of events prior to delivery.

💡 Pipes and event buses are often used together. A common use case is to create a pipe with an event bus as its target; the pipe sends events to the event bus, which then sends those events on to multiple targets.


3. DynamoDB Stream -> EventBridge

  • Stream: DynamoDB Stream
  • Pipes: Consumes events from DynamoDB Streams
    • Routes them to target EventBridge bus
    • Filter events
  • Bus: Deliver events to destinations (target)

3.1. Creating an integration

3.1.1. Enabling a dynamodb stream

3.1.2. Create a bus

  • perception-pipeline-event-bus-prod
  • perception-pipeline-event-bus-dev

  • [x] Enable Schema discovery

When you enable schema discovery the schema, or structure, of all events that get processed by the event bus are stored in the Schema registry. Once they are in the registry, you can download them and use them to speed up development of new features.

3.1.3. Create a pipe

  • perception-pipeline-pipe-prod
  • perception-pipeline-pipe-dev


Source

Starting position, choose one of the following:

  • Latest – Start reading the stream with the most recent record in the shard.
  • Trim horizon – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.

Batch size - optional:

  • Enter a maximum number of records for each batch. The default value is 100.

Filtering: Filtering for only INSERT events with status "COMPLETED" from the DynamoDB stream (according to my schema).


{
  "eventName": ["INSERT"],
  "dynamodb": {
    "NewImage": {
      "status": {
        "S": ["COMPLETED"]
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

sample dynamodb event

{
  "eventID": "a9b64ee3d998207db27d21c60505b64d",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "...ap",
  "dynamodb": {
    "ApproximateCreationDateTime": 1752538268,
    "Keys": {
      "date#event_type": {
        "S": "2025-07-15#pose#sync001#cam3"
      },
      "site": {
        "S": "haemoro_1x2"
      }
    },
    "NewImage": {
      "date#event_type": {
        "S": "2025-07-15#pose#sync001#cam3"
      },
      "site": {
        "S": "haemoro_1x2"
      },
      "expires_at": {
        "N": "1752624667"
      },
      "status": {
        "S": "COMPLETED"
      }
    },
    "SequenceNumber": "2198821900002985607563155842",
    "SizeBytes": 147,
    "StreamViewType": "NEW_IMAGE"
  },
  "eventSourceARN": "..."
}
Enter fullscreen mode Exit fullscreen mode

Enrichment:

  • Create a lambda
"""
This Lambda function is used in the EventBridge enrichment step.
It is shared by both the dev and prod environments.
"""
SORT_KEY = "date#event_type"
TARGET_KEY = "PerceptionPipelineEventType"


def lambda_handler(event, context):
    """
    Example sort_key values:
      - 2024-10-28#pose#sync001#cam7
      - 2024-10-28#recon#sync007
      - 2024-10-28#preprocessing
      - 2024-10-28#reid
      - 2024-10-28#postprocessing
      ...
    """

    sk = event["dynamodb"]["NewImage"][SORT_KEY]["S"]
    event_type = "unknown"

    match sk:
        case s if "#pose#" in s:
            event_type = "pose"
        case s if "#recon#" in s:
            event_type = "recon"
        case s if "#preprocessing" in s:
            event_type = "preprocessing"
        case s if "#reid" in s:
            event_type = "reid"
        case s if "#postprocessing" in s:
            event_type = "postprocessing"
        case _:
            event_type = "unknown"

    event[TARGET_KEY] = event_type
    return event


Enter fullscreen mode Exit fullscreen mode
  • Set Enrichment (Transformer)


{
  "dynamodb": {
    "NewImage": <$.dynamodb.NewImage>
  }
}
Enter fullscreen mode Exit fullscreen mode

Target

  • perception-pipeline-pipe-prod -> perception-pipeline-event-bus-prod
  • perception-pipeline-pipe-dev -> perception-pipeline-event-bus-dev

3.1.4. Role & Pipe setting

  • Dyanamodb Stream
  • SQS
  • Lambda
  • EventBridge

3.1.5. Update Lambda

  1. pose event -> aggregate_pose-dev (lambda) -> trigger_recon
  2. recon event -> aggregate_recon-dev -> trigger_preprocessing / trigger_reid ...

3.1.6. Create a Rule

Generate Event Rule

  • aggregate-pose-rule-dev
  • aggregate-recon-rule-dev
  • aggregate-pose-rule-prod
  • aggregate-recon-rule-prod

Build Event pattern

I've already filtered the DynamoDB stream using a pipe to allow only:

  1. "INSERT" events
  2. Events where status == COMPLETED

And I've already enriched event with "PerceptionPipelineEventType" field

So in this event bus rule, I just need to route:

  1. PerceptionPipelineEventType == "pose" -> to the aggregate_pose Lambda
  2. PerceptionPipelineEventType == "recon" -> to the aggregate_recon Lambda

sample event


{
  "version": "0",
  "id": "52461f27-a0bd-8365-8d47-866f429e6f57",
  "detail-type": "AWS API Call via CloudTrail",
  "source": "aws.lambda",
  "account": "123456789012",
  "time": "2024-08-21T09:11:10Z",
  "region": "ca-central-1",
  "resources": [],
  "detail": {
    "eventID": "a9b64ee3d998207db27d21c60505b64d",
    "eventName": "INSERT",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "...ap",
    "dynamodb": {
      "ApproximateCreationDateTime": 1752538268,
      "Keys": {
        "date#event_type": {
          "S": "2025-07-15#pose#sync001#cam3"
        },
        "site": {
          "S": "haemoro_1x2"
        }
      },
      "NewImage": {
        "date#event_type": {
          "S": "2025-07-15#pose#sync001#cam3"
        },
        "site": {
          "S": "haemoro_1x2"
        },
        "expires_at": {
          "N": "1752624667"
        },
        "status": {
          "S": "COMPLETED"
        }
      },
      "SequenceNumber": "2198821900002985607563155842",
      "SizeBytes": 147,
      "StreamViewType": "NEW_IMAGE"
    },
    "eventSourceARN": "...",
    "PerceptionPipelineEventType": "pose" 🔥 e.g., pose, recon, ...
  }
}
Enter fullscreen mode Exit fullscreen mode
  • for aggregate-pose
{
  "detail": {
    "PerceptionPipelineEventType": ["pose"]
  }
}
Enter fullscreen mode Exit fullscreen mode
  • for aggregate-recon
{
  "detail": {
    "PerceptionPipelineEventType": ["recon"]
  }
}
Enter fullscreen mode Exit fullscreen mode

Verify

3.1.7. E2E Test

  • Trigger dynamodb
from enum import Enum
import boto3
from datetime import datetime, timedelta


event_table = boto3.resource("dynamodb", region_name="ap-northeast-2").Table(
    "PerceptionPipelineEvents-dev"
)
EXPIRE_TIME_DELTA = timedelta(days=1)


class EventStatus(str, Enum):
    COMPLETED = "COMPLETED"
    PROCESSING = "PROCESSING"


class PerceptionEventTable:
    partition_key = "site"
    sort_key = "date#event_type"

    @staticmethod
    def save_event(pk: str, sort_key: str, status: EventStatus):
        def get_ttl():
            return int((datetime.now() + EXPIRE_TIME_DELTA).timestamp())

        resp = event_table.put_item(
            Item={
                "site": pk,
                "date#event_type": sort_key,
                "status": status.value,
                "expires_at": get_ttl(),
            }
        )
        return resp


if __name__ == "__main__":
    site = "test_site"
    pose_sort_key = f"2025-07-15#pose#sync001#cam6"
    recon_sort_key = f"2025-07-15#recon#sync001"

    PerceptionEventTable.save_event(
        pk=site,
        sort_key=pose_sort_key,
        status=EventStatus.COMPLETED,
    )

    PerceptionEventTable.save_event(
        pk=site,
        sort_key=recon_sort_key,
        status=EventStatus.COMPLETED,
    )

Enter fullscreen mode Exit fullscreen mode


4. Reference

Top comments (0)