Recently, I came up with a use case for the application I am currently working on where I wanted to automate email processing in order to extract key information in a structured format I could then use later on in other internal processes. To achieve this with the least possible cost in AWS while maintaining scalability and efficiency, I implemented the following architecture:
I am using SES as an entry point for the emails. SES will then store the raw email in S3 bucket according to the rules defined in the CDK stack below:
const bucket = new Bucket(sesStack, 'SesBucket',
{
bucketName: `my-bucket-${process.env.AWS_BRANCH}`,
publicReadAccess: false,
removalPolicy: RemovalPolicy.DESTROY,
intelligentTieringConfigurations: [
{
name: 'SES-Intelligent-Tiering',
archiveAccessTierTime: Duration.days(90),
deepArchiveAccessTierTime: Duration.days(365),
},
],
eventBridgeEnabled: true,
}
);
new ReceiptRuleSet(sesStack, 'SesRuleSet', {
rules: [
{
recipients: ['myemail@myaddress.com'],
actions: [
new S3({
bucket,
objectKeyPrefix: 'emails/raw',
}),
],
scanEnabled: true,
},
],
});
Because the code above enables EventBridge events on the bucket, we can then create a new EventBridge rule to trigger a StepFunction that will then process the emails as follows:
const cleanseEmailFunction = new NodejsFunction(stack, 'CleanseEmailFunction', {
...getCommonLambdaProps(),
entry: './amplify/functions/cleanseEmail.ts'
})
bucket.grantReadWrite(cleanseEmailFunction)
const extractDataFunction = new NodejsFunction(stack, 'ExtractDataFunction', {
...getCommonLambdaProps(),
entry: './amplify/functions/extractDataFunction.ts',
})
table.grantReadWriteData(extractDataFunction)
bucket.grantRead(extractDataFunction)
const stateMachine = new StateMachine(stack, 'EmailProcessingStateMachine', {
definitionBody: DefinitionBody.fromFile('./amplify/step-functions/processEmails.asl.json'),
timeout: Duration.minutes(29),
tracingEnabled: true,
stateMachineType: StateMachineType.STANDARD,
logs: {
level: LogLevel.ALL,
destination: new LogGroup(stack, 'ProcessEmailsStateMachineLogs', {
logGroupName: '/aws/vendedlogs/states/ProcessEmailsStateMachine',
retention: RetentionDays.ONE_WEEK,
}),
includeExecutionData: true,
},
definitionSubstitutions: {
TableName: table.tableName,
CleanseEmailFunction: cleanseEmailFunction.functionName,
ExtractDataFunction: extractDataFunction.functionName,
},
comment: 'State machine to process email'
});
extractDataFunction.grantInvoke(stateMachine);
cleanseEmailFunction.grantInvoke(stateMachine);
const rule = new Rule(stack, 'EmailS3ObjectCreatedRule', {
eventPattern: {
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [bucket.bucketName]
},
object: {
key: [
{
prefix: "emails/raw/"
},
]
}
}
}
});
rule.addTarget(new SfnStateMachine(stateMachine));
This state machine is composed of basically two functions:
CleanseEmailFunction: removes all sensitive data
ExtractDataFunction: uses Langchain and LangSmith to validate and extract structured JSON info through Bedrock and Sonnet 3.5 v2 and then store it in DynamoDB for later use
Later, I could use this data to perform summarization and analytics, send tailored push notifications to users, and so on.
This straightforward architecture to automate email processing can be extended as needed to perform additional tasks, like evaluating the model response and performing further transformations if needed. The best of all is that I pay only for what I use.
If you want to scale it further and optimize costs, you may add a queue with SQS in between and use batch inference to process emails at once.
Please notice that batch inference may not be enabled in your account, and you may need to request it through support, though (I’ve been fighting with support to get access for more than a month now — with a business support plan).
Thanks for reading! Got any cool ideas or feedback you want to share? Drop a comment, send me a message, or follow me, and let’s keep building!
Top comments (0)