What we're going to do,
Managing and keeping track of receipts is always tedious and sometimes we tend to miss noting down the expenses what if we automate that process,
that is what this project aims to do,
you upload the receipt in you WhatsApp and let it take care of the rest and send you an E-mail
1️⃣Create a twilio WhatsApp sandbox API,
2️⃣Create a s3 bucket,
3️⃣Create a webhook,
import express from 'express';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import bodyParser from 'body-parser';
import dotenv from 'dotenv';
import fetch from 'node-fetch';
dotenv.config();
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
const s3 = new S3Client({
region: process.env.AWS_REGION,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
},
});
app.post('/whatsapp-webhook', async (req, res) => {
const { Body, From, NumMedia, MediaUrl0, MediaContentType0 } = req.body;
const sender = From.replace('whatsapp:', '');
const timestamp = Date.now();
try {
if (parseInt(NumMedia) > 0) {
const response = await fetch(MediaUrl0); // Twilio sends signed URL
const buffer = await response.buffer();
const extension = MediaContentType0.split('/')[1]; // e.g., image/jpeg → jpeg
const Key = `whatsapp-media/${timestamp}-${sender}.${extension}`;
await s3.send(new PutObjectCommand({
Bucket: process.env.S3_BUCKET_NAME,
Key,
Body: buffer,
ContentType: MediaContentType0,
}));
console.log(`✅ Media uploaded: ${Key}`);
// Send reply back to user
return res.send(`
<Response>
<Message>📁 Got your file! Uploaded as ${Key}</Message>
</Response>
`);
} else {
// Handle text messages
const Key = `whatsapp-text/${timestamp}-${sender}.txt`;
await s3.send(new PutObjectCommand({
Bucket: process.env.S3_BUCKET_NAME,
Key,
Body,
ContentType: 'text/plain',
}));
console.log(`✅ Text uploaded: ${Key}`);
return res.send(`
<Response>
<Message>📝 Got your text and saved it!</Message>
</Response>
`);
}
} catch (err) {
console.error('❌ Error:', err);
res.status(500).send('<Response><Message>😓 Something went wrong</Message></Response>');
}
});
app.listen(process.env.PORT, () => {
console.log(`Webhook ready at http://localhost:${process.env.PORT}`);
});
4️⃣Expose it using ngrok,
node index.js
npm i -g ngrok
ngrok http 3000
5️⃣Paste the ngrok url in twilio console,
6️⃣Create a DynamoDB table to store Data,
7️⃣Set Up Amazon SES (to send emails)
Create a Identity in SES to send E-mails
Assuming you're in a sandbox account add & verify the recipient E-mail as well for non-sandbox accounts this step is not needed.
After verifying your sender E-mail you must see something like this,
8️⃣Create IAM Role for Lambda Execution
Create a new Role choose Lambda as the use case,
Attach the following policies,
- `AmazonS3ReadOnlyAccess`
- `AmazonTextractFullAccess`
- `AmazonDynamoDBFullAccess`
- `AmazonSESFullAccess`
- `AWSLambdaBasicExecutionRole`
Name the role LambdaReceiptProcessingRole
9️⃣Create Lambda Function (processing engine)
Name the function ProcessReceiptFunction
Choose the existing role we just created,
Runtime Choose Python 3.9
Go to Configuration> Environment Variables and add these variables,
Go to the Code tab and paste this code,
import json
import os
import boto3
import uuid
from datetime import datetime
import urllib.parse
# Initialize AWS clients
s3 = boto3.client('s3')
textract = boto3.client('textract')
dynamodb = boto3.resource('dynamodb')
ses = boto3.client('ses')
# Environment variables
DYNAMODB_TABLE = os.environ.get('DYNAMODB_TABLE', 'Receipts')
SES_SENDER_EMAIL = os.environ.get('SES_SENDER_EMAIL', 'your-email@example.com')
SES_RECIPIENT_EMAIL = os.environ.get('SES_RECIPIENT_EMAIL', 'recipient@example.com')
def lambda_handler(event, context):
try:
# Get the S3 bucket and key from the event
bucket = event['Records'][0]['s3']['bucket']['name']
# URL decode the key to handle spaces and special characters
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
print(f"Processing receipt from {bucket}/{key}")
# Verify the object exists before proceeding
try:
s3.head_object(Bucket=bucket, Key=key)
print(f"Object verification successful: {bucket}/{key}")
except Exception as e:
print(f"Object verification failed: {str(e)}")
raise Exception(f"Unable to access object {key} in bucket {bucket}: {str(e)}")
# Step 1: Process receipt with Textract
receipt_data = process_receipt_with_textract(bucket, key)
# Step 2: Store results in DynamoDB
store_receipt_in_dynamodb(receipt_data, bucket, key)
# Step 3: Send email notification
send_email_notification(receipt_data)
return {
'statusCode': 200,
'body': json.dumps('Receipt processed successfully!')
}
except Exception as e:
print(f"Error processing receipt: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
def process_receipt_with_textract(bucket, key):
"""Process receipt using Textract's AnalyzeExpense operation"""
try:
print(f"Calling Textract analyze_expense for {bucket}/{key}")
response = textract.analyze_expense(
Document={
'S3Object': {
'Bucket': bucket,
'Name': key
}
}
)
print("Textract analyze_expense call successful")
except Exception as e:
print(f"Textract analyze_expense call failed: {str(e)}")
raise
# Generate a unique ID for this receipt
receipt_id = str(uuid.uuid4())
# Initialize receipt data dictionary
receipt_data = {
'receipt_id': receipt_id,
'date': datetime.now().strftime('%Y-%m-%d'), # Default date
'vendor': 'Unknown',
'total': '0.00',
'items': [],
's3_path': f"s3://{bucket}/{key}"
}
# Extract data from Textract response
if 'ExpenseDocuments' in response and response['ExpenseDocuments']:
expense_doc = response['ExpenseDocuments'][0]
# Process summary fields (TOTAL, DATE, VENDOR)
if 'SummaryFields' in expense_doc:
for field in expense_doc['SummaryFields']:
field_type = field.get('Type', {}).get('Text', '')
value = field.get('ValueDetection', {}).get('Text', '')
if field_type == 'TOTAL':
receipt_data['total'] = value
elif field_type == 'INVOICE_RECEIPT_DATE':
# Try to parse and format the date
try:
receipt_data['date'] = value
except:
# Keep the default date if parsing fails
pass
elif field_type == 'VENDOR_NAME':
receipt_data['vendor'] = value
# Process line items
if 'LineItemGroups' in expense_doc:
for group in expense_doc['LineItemGroups']:
if 'LineItems' in group:
for line_item in group['LineItems']:
item = {}
for field in line_item.get('LineItemExpenseFields', []):
field_type = field.get('Type', {}).get('Text', '')
value = field.get('ValueDetection', {}).get('Text', '')
if field_type == 'ITEM':
item['name'] = value
elif field_type == 'PRICE':
item['price'] = value
elif field_type == 'QUANTITY':
item['quantity'] = value
# Add to items list if we have a name
if 'name' in item:
receipt_data['items'].append(item)
print(f"Extracted receipt data: {json.dumps(receipt_data)}")
return receipt_data
def store_receipt_in_dynamodb(receipt_data, bucket, key):
"""Store the extracted receipt data in DynamoDB"""
try:
table = dynamodb.Table(DYNAMODB_TABLE)
# Convert items to a format DynamoDB can store
items_for_db = []
for item in receipt_data['items']:
items_for_db.append({
'name': item.get('name', 'Unknown Item'),
'price': item.get('price', '0.00'),
'quantity': item.get('quantity', '1')
})
# Create item to insert
db_item = {
'receipt_id': receipt_data['receipt_id'],
'date': receipt_data['date'],
'vendor': receipt_data['vendor'],
'total': receipt_data['total'],
'items': items_for_db,
's3_path': receipt_data['s3_path'],
'processed_timestamp': datetime.now().isoformat()
}
# Insert into DynamoDB
table.put_item(Item=db_item)
print(f"Receipt data stored in DynamoDB: {receipt_data['receipt_id']}")
except Exception as e:
print(f"Error storing data in DynamoDB: {str(e)}")
raise
def send_email_notification(receipt_data):
"""Send an email notification with receipt details"""
try:
# Format items for email
items_html = ""
for item in receipt_data['items']:
name = item.get('name', 'Unknown Item')
price = item.get('price', 'N/A')
quantity = item.get('quantity', '1')
items_html += f"<li>{name} - ${price} x {quantity}</li>"
if not items_html:
items_html = "<li>No items detected</li>"
# Create email body
html_body = f"""
<html>
<body>
<h2>Hurray!! Completed My Automation Project</h2>
<p><strong>Receipt ID:</strong> {receipt_data['receipt_id']}</p>
<p><strong>Vendor:</strong> {receipt_data['vendor']}</p>
<p><strong>Date:</strong> {receipt_data['date']}</p>
<p><strong>Total Amount:</strong> ${receipt_data['total']}</p>
<p><strong>S3 Location:</strong> {receipt_data['s3_path']}</p>
<h3>Items:</h3>
<ul>
{items_html}
</ul>
<p>The receipt has been processed and stored in DynamoDB.</p>
</body>
</html>
"""
# Send email using SES
ses.send_email(
Source=SES_SENDER_EMAIL,
Destination={
'ToAddresses': [SES_RECIPIENT_EMAIL]
},
Message={
'Subject': {
'Data': f"Receipt Processed: {receipt_data['vendor']} - ${receipt_data['total']}"
},
'Body': {
'Html': {
'Data': html_body
}
}
}
)
print(f"Email notification sent to {SES_RECIPIENT_EMAIL}")
except Exception as e:
print(f"Error sending email notification: {str(e)}")
# Continue execution even if email fails
print("Continuing execution despite email error")
Go to configuration tab > General configuration > edit
Increase the timeout from 0.3 sec to 2 min for complex file.
Hit save
1️⃣0️⃣Again go to the s3
In the Properties Tab
Add the Event Notification
Prefix : whatsapp-media/
Object creation : Select All object create events
1️⃣1️⃣Finally choose the Destination
Wait for 30 sec and also check in spam folder for the mail....if you do not receive mail after 2 min go to the monitor tab in Lambda Function and check the log groups in cloudwatch
Top comments (0)