DEV Community

James Lee
James Lee

Posted on

Serverless Workflows: Orchestrating Multi-Step Pipelines with AWS Step Functions

},

"NotifyDownstream": {
  "Type": "Task",
  "Resource": "arn:aws:states:::sns:publish",
  "Parameters": {
    "TopicArn": "arn:aws:sns:us-east-1:123:brand-asset-processed",
    "Message.$": "States.JsonToString($)"
  },
  "End": true
},

"HandleProcessingError": {
  "Type": "Task",
  "Resource": "arn:aws:lambda:us-east-1:123:function:handle-processing-error",
  "End": true
}
Enter fullscreen mode Exit fullscreen mode

}
}


### Step 2: The Lambda Functions

Each state's Lambda function does one thing well — the orchestration logic lives in the state machine, not the functions.

Enter fullscreen mode Exit fullscreen mode


python

validate_brand_asset.py

import json

class ValidationError(Exception):
pass

def handler(event, context):
"""
Validates incoming brand asset upload.
Input: { brandId, s3Bucket, s3Key, fileSize, contentType }
Output: same event, passed through to next state
"""
brand_id = event.get('brandId')
s3_key = event.get('s3Key')
content_type = event.get('contentType', '')
file_size = event.get('fileSize', 0)

if not brand_id:
    raise ValidationError('Missing required field: brandId')

if not s3_key:
    raise ValidationError('Missing required field: s3Key')

allowed_types = {'image/png', 'image/jpeg', 'image/svg+xml', 'image/webp'}
if content_type not in allowed_types:
    raise ValidationError(f'Unsupported content type: {content_type}')

max_size_bytes = 10 * 1024 * 1024  # 10MB
if file_size > max_size_bytes:
    raise ValidationError(f'File too large: {file_size} bytes (max 10MB)')

print(f'Validation passed for brand {brand_id}: {s3_key}')

# Pass through the event — next state receives this as input
return event
Enter fullscreen mode Exit fullscreen mode

Enter fullscreen mode Exit fullscreen mode


python

detect_logo.py

import boto3
import os

rekognition = boto3.client('rekognition')

def handler(event, context):
"""
Runs Rekognition label detection on the uploaded brand asset.
Input: { brandId, s3Bucket, s3Key, ... }
Output: { logoLabels: [...] } ← merged into parallel results
"""
bucket = event['s3Bucket']
key = event['s3Key']

response = rekognition.detect_labels(
    Image={'S3Object': {'Bucket': bucket, 'Name': key}},
    MaxLabels=15,
    MinConfidence=75
)

labels = [
    {'name': label['Name'], 'confidence': round(label['Confidence'], 2)}
    for label in response['Labels']
]

print(f'Detected {len(labels)} labels for {key}')
return {'logoLabels': labels}
Enter fullscreen mode Exit fullscreen mode

Enter fullscreen mode Exit fullscreen mode


python

generate_color_palette.py

import boto3
from PIL import Image
import io
from collections import Counter

s3 = boto3.client('s3')

def handler(event, context):
"""
Extracts dominant colors from the brand asset.
Input: { brandId, s3Bucket, s3Key, ... }
Output: { colors: ['#FF5733', '#C70039', ...] }
"""
bucket = event['s3Bucket']
key = event['s3Key']

# Download image from S3
obj = s3.get_object(Bucket=bucket, Key=key)
image_data = obj['Body'].read()

# Extract dominant colors
img = Image.open(io.BytesIO(image_data)).convert('RGB')
img = img.resize((100, 100))  # downsample for speed

pixels = list(img.getdata())
color_counts = Counter(pixels)
top_colors = color_counts.most_common(5)

hex_colors = [
    '#{:02x}{:02x}{:02x}'.format(r, g, b)
    for (r, g, b), _ in top_colors
]

print(f'Extracted {len(hex_colors)} dominant colors from {key}')
return {'colors': hex_colors}
Enter fullscreen mode Exit fullscreen mode

### Step 3: Deploy with Serverless Framework

Enter fullscreen mode Exit fullscreen mode


yaml

serverless.yml

service: brand-asset-pipeline

provider:
name: aws
runtime: python3.12
region: us-east-1

plugins:

  • serverless-step-functions

functions:
validateBrandAsset:
handler: validate_brand_asset.handler

detectLogo:
handler: detect_logo.handler
timeout: 30 # Rekognition can be slow

generateColorPalette:
handler: generate_color_palette.handler
timeout: 30

handleValidationError:
handler: error_handlers.handle_validation_error

handleProcessingError:
handler: error_handlers.handle_processing_error

stepFunctions:
stateMachines:
brandAssetPipeline:
name: brand-asset-ingestion-pipeline
definition: ${file(state_machine.json)}
loggingConfig:
level: ALL
includeExecutionData: true
destinations:
- arn:aws:logs:us-east-1:123:log-group:/aws/states/brand-pipeline


---

## Key Patterns in Step Functions

### Pattern 1: Retry with Exponential Backoff

Every `Task` state should define retry behavior for transient failures (Lambda throttling, downstream API timeouts):

Enter fullscreen mode Exit fullscreen mode


json
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.TooManyRequestsException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
},
{
"ErrorEquals": ["RateLimitError"],
"IntervalSeconds": 10,
"MaxAttempts": 5,
"BackoffRate": 1.5
}
]


The retry sequence with `BackoffRate: 2` and `IntervalSeconds: 2`:
- Attempt 1 fails → wait 2s
- Attempt 2 fails → wait 4s
- Attempt 3 fails → wait 8s
- All retries exhausted → `Catch` block fires

### Pattern 2: Map State for Batch Processing

Process a list of items in parallel — like processing multiple brand assets in one workflow execution:

Enter fullscreen mode Exit fullscreen mode


json
"ProcessBrandBatch": {
"Type": "Map",
"ItemsPath": "$.brandIds",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "ProcessSingleBrand",
"States": {
"ProcessSingleBrand": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:process-brand",
"End": true
}
}
},
"Next": "AggregateResults"
}


Input:
Enter fullscreen mode Exit fullscreen mode


json
{
"brandIds": ["nike", "adidas", "puma", "reebok", "newbalance"]
}


Step Functions processes all 5 brands in parallel (up to `MaxConcurrency: 10`), then waits for all to complete before moving to `AggregateResults`.

### Pattern 3: Wait for Human Approval (Callback Pattern)

For workflows that need human review before proceeding — use the `.waitForTaskToken` integration:

Enter fullscreen mode Exit fullscreen mode


json
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "send-approval-email",
"Payload": {
"taskToken.$": "$$.Task.Token",
"brandId.$": "$.brandId",
"reviewUrl.$": "States.Format('https://admin.yourdomain.com/review/{}', $.brandId)"
}
},
"HeartbeatSeconds": 86400,
"Next": "ProcessApprovedBrand"
}


Enter fullscreen mode Exit fullscreen mode


python

send_approval_email.py

import boto3
import os

ses = boto3.client('ses')
sfn = boto3.client('stepfunctions')

def handler(event, context):
"""
Sends approval email with approve/reject links.
The workflow pauses until the reviewer clicks a link,
which calls SendTaskSuccess or SendTaskFailure.
"""
task_token = event['taskToken']
brand_id = event['brandId']
review_url = event['reviewUrl']

# Store token for later callback
# (in practice, store in DynamoDB keyed by brandId)

ses.send_email(
    Source='noreply@yourdomain.com',
    Destination={'ToAddresses': ['reviewer@yourdomain.com']},
    Message={
        'Subject': {'Data': f'Brand Review Required: {brand_id}'},
        'Body': {'Html': {'Data': f'''
            <p>Brand <b>{brand_id}</b> requires review.</p>
            <p><a href="{review_url}?token={task_token}&action=approve">✅ Approve</a></p>
            <p><a href="{review_url}?token={task_token}&action=reject">❌ Reject</a></p>
        '''}}
    }
)

# Return immediately — workflow is now paused waiting for callback
return {'status': 'approval_email_sent', 'brandId': brand_id}
Enter fullscreen mode Exit fullscreen mode

def handle_approval_callback(task_token: str, approved: bool):
"""Called by your API when reviewer clicks approve/reject"""
if approved:
sfn.send_task_success(
taskToken=task_token,
output=json.dumps({'approved': True})
)
else:
sfn.send_task_failure(
taskToken=task_token,
error='ReviewRejected',
cause='Brand rejected by human reviewer'
)


### Pattern 4: Express vs Standard Workflows

Step Functions offers two workflow types with very different characteristics:

| | Standard Workflows | Express Workflows |
|---|---|---|
| **Max duration** | 1 year | 5 minutes |
| **Execution model** | Exactly-once | At-least-once |
| **Pricing** | Per state transition | Per execution duration |
| **Execution history** | Full (90 days) | CloudWatch Logs only |
| **Best for** | Long-running, auditable workflows | High-volume, short pipelines |

Enter fullscreen mode Exit fullscreen mode


yaml

Express workflow for high-volume, short-lived pipelines

stepFunctions:
stateMachines:
brandMetricsPipeline:
type: EXPRESS # ← Express workflow
name: brand-metrics-pipeline
definition: ${file(express_state_machine.json)}




Use **Standard** for: order processing, approval workflows, data ingestion with audit requirements.

Use **Express** for: real-time data transformation, IoT event processing, high-frequency API backends.

---

## Step Functions vs DIY Orchestration

A common question: why not just chain Lambda functions with SQS queues?

| Concern | SQS + Lambda DIY | Step Functions |
|---|---|---|
| State tracking | You build it | Built-in |
| Retry logic | You build it | Built-in per-state |
| Parallel execution | Complex fan-out/fan-in | Native `Parallel` state |
| Execution history | CloudWatch logs only | Visual execution graph |
| Error routing | Manual DLQ wiring | Native `Catch` blocks |
| Long-running (>15min) | Requires external state | Native support (Standard) |
| Cost | Lower for simple cases | Per state transition |

The tipping point: **if your workflow has more than 3 steps, branching logic, or retry requirements, Step Functions pays for itself in reduced complexity.**

---

## Summary

| Concept | AWS Step Functions Implementation |
|---|---|
| **State machine** | JSON-defined workflow (Amazon States Language) |
| **Sequential steps** | Default — each state transitions to `Next` |
| **Branching** | `Choice` state with condition rules |
| **Parallel execution** | `Parallel` state — branches run simultaneously |
| **Batch processing** | `Map` state — iterate over arrays |
| **Retry logic** | Per-state `Retry` with exponential backoff |
| **Error handling** | Per-state `Catch` with error routing |
| **Human-in-the-loop** | `.waitForTaskToken` callback pattern |
| **Workflow types** | Standard (long-running) vs Express (high-volume) |

Step Functions doesn't replace Lambda — it elevates it. Individual functions stay small and focused; the workflow handles coordination, state, retries, and error routing. The result is a system where each piece is independently testable, observable, and replaceable.

**Build small functions. Orchestrate big workflows.**

---

*Next in this series: **Part 7 — Serverless Best Practices: Production Architecture, Stateless Design & Cost Optimization***
Enter fullscreen mode Exit fullscreen mode

Top comments (0)