DEV Community

maso
maso

Posted on

DAY6 -Event-driven

Overview

Today, I’ll build a simple event-driven pipeline using Amazon EventBridge, AWS Step Functions, AWS Lambda, Amazon SQS, Amazon SNS, and Amazon S3.
EventBridge triggers a Step Functions state machine, which invokes a producer Lambda. The producer stores the result in S3 and sends a message to SQS. SQS then triggers a worker Lambda, which publishes a notification to SNS. Finally, SNS delivers the message to my email.

※Using SQS to decouple components is a common exam pattern and a real-world best practice to separate components for each task to account for Lambda's execution time limits and simplify troubleshooting when errors occur.

Hands-on

1. Create a S3 bucket

Block Public Access : ON (default)
Default encryption : SSE-S3 (default)

2. Create a SNS topic

Type : Standard

Check the SNS Topic ARN!

Create a subscription to receive Email notifications.
the SNS topic created in the previous step → Subscriptions
Protocol : Email
Endpoint : your email address

Check your email and confirm subscription!

3. Create SQS queue

Type : Standard
Visibility timeout : 30s

Check the SQS queue URL!

4. Create a Producer Lambda (S3 saving + SQS triggering) invoked by the Step Functions

1. Create IAM role for the Lambda

Create IAM role and attach the following permission using the inline policy to allow lambda to access the S3 bucket and SQS.


{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "PutToS3",
      "Effect": "Allow",
      "Action": ["s3:PutObject"],
      "Resource": "arn:aws:s3:::<s3-bucket-name>/*"
    },
    {
      "Sid": "SendToSQS",
      "Effect": "Allow",
      "Action": ["sqs:SendMessage"],
      "Resource": "arn:aws:sqs:<region>:<account-id>:<queue-name>"
    }
  ]
}

Enter fullscreen mode Exit fullscreen mode

2. Create a Lambda function

Runtime : Python 3.12
Role : the IAM role created in the previous step

Set the following environment variables in the Code → Environment variables section.


BUCKET = <your S3 bucket name>
QUEUE_URL = <URL of SQS>
Enter fullscreen mode Exit fullscreen mode

Set the following code and deploy.

import json, os, time, uuid
import boto3

 # Initialization
s3 = boto3.client("s3")
sqs = boto3.client("sqs")

 # Reading Environment Variables
BUCKET = os.environ["BUCKET"]
QUEUE_URL = os.environ["QUEUE_URL"]

 # Receive the event passed by the Step Functions
def lambda_handler(event, context):
    job_id = str(uuid.uuid4())
    ts = int(time.time())

 # Create the object sent to the S3 bucket
    result = {
        "jobId": job_id,
        "timestamp": ts,
        "input": event
    }

 # Decide the S3 storage destination
    key = f"day6/results/{job_id}.json"

 # Put Object to the S3 bucket
    s3.put_object(
        Bucket=BUCKET,
        Key=key,
        Body=json.dumps(result).encode("utf-8"),
        ContentType="application/json"
    )

 # Create the message sent to the SQS
    msg = {
        "jobId": job_id,
        "bucket": BUCKET,
        "key": key
    }

 # Send message to the SQS 
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps(msg)
    )

 # Return the information to the Step Functions
    return {
        "jobId": job_id,
        "s3": f"s3://{BUCKET}/{key}",
        "queued": True
    }
Enter fullscreen mode Exit fullscreen mode

5. Create a worker lambda (publish to SNS), triggered by SQS

1. Create IAM role for the lambda

AWSLambdaBasicExecutionRole
Create IAM role and attach the following permission to allow lambda to publish the SNS and receive message by the SQS.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "PublishToSNS",
      "Effect": "Allow",
      "Action": ["sns:Publish"],
      "Resource": "arn:aws:sns:<region>:<account-id>:hs-day6-topic"
    },
    {
      "Sid": "SQSReceiveForWorker",
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "<QUEUE_ARN>"
    }
  ]
}

Enter fullscreen mode Exit fullscreen mode
2. Create Lambda

Runtime : Python 3.12
Role : the IAM role created in the previous step

Set the following environment variables.

TOPIC_ARN = <your SNS Topic ARN>
Enter fullscreen mode Exit fullscreen mode

Set the following code.

import json, os
import boto3

 # Initialization
sns = boto3.client("sns")
TOPIC_ARN = os.environ["TOPIC_ARN"]

 # Receive the event by the SQS
def lambda_handler(event, context):

 # Loop the record execution
    for r in event.get("Records", []):
        body = r["body"]
        msg = json.loads(body)

 # Create the notification message
        text = (
            f"Day6 completed.\n"
            f"jobId: {msg.get('jobId')}\n"
            f"S3: s3://{msg.get('bucket')}/{msg.get('key')}\n"
        )

 # Publish to the SNS
        sns.publish(
            TopicArn=TOPIC_ARN,
            Subject="Day6 Notification",
            Message=text
        )
    return {"ok": True}
Enter fullscreen mode Exit fullscreen mode

3. Set the SQS as the trigger

Worker Lambda → Add trigger → SQS created in the previous step
Batch size : 1

6. Create Step Functions

Step Functions → State machines → Create state machine
Type : Standard

{
  "Comment": "Day6: Invoke producer lambda",
  "StartAt": "InvokeProducer",
  "States": {
    "InvokeProducer": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "hs-day6-producer",
        "Payload.$": "$"
      },
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 2,
          "MaxAttempts": 2,
          "BackoffRate": 2.0
        }
      ],
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

7. Create an Event Bridge rule

Rule type : EventBridge Scheduled rule (5 minutes)
Target : Step Functions state machine
Input : set the following example

{"source":"eventbridge","note":"day6 test"}
Enter fullscreen mode Exit fullscreen mode

8. Functionality Verification

You've created all resources!
Enable EventBridge rule and wait for few minutes...

➀EventBridge trigger the Step functions every 5 minutes.
※You can ensure Step Functions is executed at the "Executions" tab.

➁Step functions trigger the Producer Lambda.
※You can ensure the producer Lambda is invoked in the "Monitor" tab.

➂Producer Lambda send the data to the S3.
※The json file is created in the S3!

➃Producer Lambda send message to the SQS.
※You can ensure the SQS receives the message from the Lambda at the "Monitoring" tab.

➄SQS trigger the Worker Lambda, which sends the results to the SNS.
※You can ensure the worker Lambda is invoked in the "Monitor" tab.

➅SNS sends an Email notification to the user.
※You should receive the notification Email from SNS!

※At first, My Worker Lambda occurred an error because I set the SNS subscription's ARN (not the SNS topic's ARN!) to the IAM policy attached to the Lambda... If you can't receive the Email, please review the Cloudwatch Logs in the order they occur.

9. Tidying up

  • Delete the EventBridge Rule
  • Delete the Step Functions state machine
  • Delete Lambda (Producer/Worker)
  • Delete SQS queue
  • Delete SNS Topic + Subscription
  • Delete S3 bucket

For test

Key exam points related to today's services.

EventBridge

  • Routing the event to the multiple targets (Step Functions/Lambda/SQS/SNS...) ※EventBridge can route the event. When you want to guarantee order or perform reprocessing, you should use SQS queuing.

Step Functions

  • Orchestration of the processing.
  • should use correct mode. Requires history/observability or Long-running processes → Standard High event volume or Ultra-low latency → Express
  • When processing fails, you can choose retry or catch (set other processing like notification).

Lambda

  • Execute code without needing a server
  • when you need heavy/time-consuming processing, you should use ECS/Batch as Lambda has processing time limits.

SQS

  • message queueing service. Processing can be made asynchronous and loosely coupled.
  • SQS itself does not execute processing; it only invokes functions like Lambda.

Top comments (0)