Water quality testing normally takes between 24 and 48 hours.
This makes it almost impossible to monitor water quality in real-time, especially in cases where the water has to be clean in real-time.I wanted this time taken down to seconds.
That is why I created a real-time water quality monitoring system using ESP32 sensors, AWS IoT Core, Lambda, and DynamoDB — a production-ready pipeline, not a demo.
The Problem
Most IoT tutorials go this far:
- Sending a temperature reading
- Displaying it on a dashboard
They don’t cover what happens when you need to:
- Handle 100K+ messages per hour reliably
- Run ML inference on every incoming reading
- Trigger alerts within <5 seconds
- Keep costs under $2 per device/month
The Architecture

This system is built as a totally serverless, event-driven architecture where each component is triggered by incoming data rather than continuous operation.
It eliminates the need for infrastructure management as there are no EC2 instances, container orchestration layers, or manual scaling configurations.
Each service (IoT Core, Lambda, DynamoDB, and API Gateway) scales independently based on workload, enabling the system to handle variable data ingestion rates efficiently while maintaining low operational overhead.
Device Layer: ESP32 + MQTT
Each ESP32 device collects sensor data every 60 seconds:
- pH (0–14)
- Turbidity (0–1000 NTU)
- TDS — Total Dissolved Solids (0–2000 ppm)
- Temperature (-10°C to 50°C)
Example payload:
{
"deviceId": "ESP32-ABC123",
"timestamp": "2024-01-15T10:30:00Z",
"readings": {
"pH": 7.2,
"turbidity": 3.5,
"tds": 450,
"temperature": 22.5
},
"metadata": {
"firmwareVersion": "2.1.0",
"batteryLevel": 85,
"signalStrength": -45
}
}
Why MQTT over HTTP? It's a fraction of the overhead. MQTT keeps a persistent TCP connection, so each message is just the payload no HTTP headers, no TLS handshake per message. At 100K messages/hour that difference matters.
AWS IoT Core: Secure Ingestion Layer
IoT Core acts as the managed MQTT broker. Devices connect with X.509 certificates — no username/password, no API keys. Each device gets its own cert, so you can revoke a single compromised device without touching anything else.
The IoT Rule that routes messages to Lambda is dead simple:
SELECT * FROM 'aquachain/devices/+/data'
The '+' wildcard matches any device ID, and the IoT Core evaluates this rule for every matching message, invoking the Lambda function. This approach eliminates the need for polling and queues, making it a pure event-driven system.
Lambda: Validation and Storage
The data processing Lambda does three things: validate, store, and trigger inference.
def lambda_handler(event, context):
device_id = event['deviceId']
readings = event['readings']
# 1. Validate sensor ranges
if not validate_readings(readings):
logger.warning(f"Invalid readings from {device_id}", extra={
'deviceId': device_id,
'readings': readings
})
return # Drop the message, don't store garbage
# 2. Store in DynamoDB
table.put_item(Item={
'deviceId': device_id,
'timestamp': event['timestamp'],
**readings,
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
})
# 3. Trigger ML inference asynchronously
lambda_client.invoke(
FunctionName='aquachain-function-ml-inference-dev',
InvocationType='Event', # async — don't wait
Payload=json.dumps(event)
)
A few things worth calling out here:
1. Validation is non-negotiable. Sensors drift, connectors corrode, and firmware bugs happen. If you store garbage readings, your ML model trains on garbage. I reject anything outside physical bounds — pH can't be 15, temperature can't be -50°C.
2. TTL is free data lifecycle management. DynamoDB's TTL feature automatically deletes items after a timestamp you set. Raw readings expire after 90 days with zero Lambda invocations and zero cost.
3. Async ML invocation. I invoke the inference Lambda with InvocationType='Event' so the data processing function returns immediately. The ML inference runs in parallel. This is what keeps the pipeline fast.
The Latency Reality
Here's what the actual CloudWatch data shows for the data processing Lambda:
| Scenario | Duration |
|---|---|
| Warm, no DB write | ~2ms |
| Warm, with DynamoDB PutItem | ~150ms |
| Cold start | ~617ms init + ~2ms execution |
The warm execution with a DynamoDB write averages around 150ms. Cold starts add ~617ms on top. The product aim was <5 seconds from sensor reading to dashboard we're well inside that. However, if you consistently target sub-100ms Lambda execution, the DynamoDB write is likely your bottleneck. The fix involves making the write asynchronous via SQS, so the Lambda validates and returns in ~2ms, while a separate consumer handles persistence.
ML Inference: XGBoost on Lambda
The ML model is an XGBoost classifier that outputs a Water Quality Index (WQI) from 0-100. It is fully contained in Lambda, no SageMaker endpoint to keep warm, no idle costs.
def lambda_handler(event, context):
# Load model from S3 (cached in /tmp after first load)
model = load_model()
features = extract_features(event['readings'])
wqi = model.predict([features])[0]
if wqi < 50:
trigger_alert(event['deviceId'], wqi, event['readings'])
return {'wqi': float(wqi), 'quality': classify_wqi(wqi)}
The ML model used here is an XGBoost classifier that outputs a Water Quality Index (WQI) ranging from 0-100. It is fully contained in Lambda, no SageMakeThe model caches in /tmp after the initial run; subsequent warm calls skip S3 downloads, keeping inference under 100ms. Speaking of model performance, this model has 99.74% accuracy on the validation set, which sounds great until you see that the data is highly structured and the classes are well-separated. XGBoost is actually the correct choice here, as it handles missing sensor values, trains quickly, and is even interpretable enough that you be able to explain why it flagged a particular reading.
DynamoDB: Schema Design for Time-Series
The readings table uses a composite key:
- Partition key: deviceId
- Sort key: timestamp This means all readings for a device are co-located on the same partition, and you can query a time range with a single DynamoDB Query call, no scans, no GSIs needed for the primary access pattern.
response = table.query(
KeyConditionExpression=Key('deviceId').eq(device_id) &
Key('timestamp').between(start, end),
ScanIndexForward=False, # newest first
Limit=100
)
One thing I got wrong early on: I was storing floats directly. DynamoDB doesn't support Python floats; it uses Decimal. The fix is a recursive converter before any put_item call:
def floats_to_decimal(obj):
if isinstance(obj, float):
return Decimal(str(obj))
if isinstance(obj, dict):
return {k: floats_to_decimal(v) for k, v in obj.items()}
if isinstance(obj, list):
return [floats_to_decimal(i) for i in obj]
return obj
Infrastructure as Code: AWS CDK
Everything is defined in Python CDK. No clicking around the console, no manual resource creation. The IoT rule, Lambda functions, DynamoDB tables, IAM policies are all code.
# IoT Rule → Lambda
iot_rule = iot.CfnTopicRule(self, "DataIngestionRule",
rule_name="aquachain_data_ingestion_dev",
topic_rule_payload=iot.CfnTopicRule.TopicRulePayloadProperty(
sql="SELECT * FROM 'aquachain/devices/+/data'",
actions=[iot.CfnTopicRule.ActionProperty(
lambda_=iot.CfnTopicRule.LambdaActionProperty(
function_arn=data_processing_fn.function_arn
)
)]
)
)
# Grant IoT Core permission to invoke Lambda
data_processing_fn.add_permission("IoTInvoke",
principal=iam.ServicePrincipal("iot.amazonaws.com"),
source_arn=iot_rule.attr_arn
)
The IAM policy for the data processing Lambda follows least privilege. It can only write to the specific DynamoDB table and invoke the specific ML inference function. Nothing else.
What I'd Do Differently
1. SQS Buffer between IoT Core & Lambda. Rule -> Lambda is fine, but throttling on Lambda will cause IoT Core to drop messages. An SQS queue between the two provides a buffer & retry functionality automatically.
2. Using DynamoDB on-demand since day one. I used provisioned capacity initially, which consumed a lot of time to get the read & write units right. Using on-demand costs a little more per request, but removes the need to think about capacity planning altogether. It’s worth paying a little more considering the nature of IoT data, which can cause spikes in usage.
3. Using structured logging since day one. I implemented structured logging in JSON later on, which was a nightmare to add to 30+ Lambda functions. It would have been so much easier to add a logging utility that adds a device ID, request ID, & timestamp to every log line, which would have made querying logs a breeze using CloudWatch Insights.
The Numbers
After running in production:
- 712 Lambda invocations over the past week, zero errors
- Average warm execution: ~83ms
- Cold start init: ~615ms (happens rarely after the function is warm)
- DynamoDB write latency: ~148ms average
- Cost: well under $2/device/month at current scale In this case, the serverless model really delivers on its promise. There is no infrastructure to babysit, scaling is automatic, and costs scale linearly with usage, not being a flat monthly charge.
Wrapping Up
The hardest part of this build was not the AWS services themselves – the documentation is good, and the SDKs are solid. The hardest part was the sensor layer – calibration drift, WiFi reconnection logic, etc.
The cloud pipeline is actually the easy part – as long as you've got the DynamoDB key schema correct upfront, validate at the edge before anything goes into the database, and keep the Lambda functions thin, validate, store, and delegate.
The entire stack – ESP32 firmware, Lambda functions, CDK infrastructure, React dashboard is the AquaChain project. Would be happy to dive deeper into any of these parts.
_
Built with: ESP32, AWS IoT Core, Lambda (Python 3.11), DynamoDB, XGBoost, AWS CDK, React 19_
Top comments (0)