Every telecoms network has thousands of physical devices — routers, base stations, environmental sensors in data centres — quietly reporting readings every few seconds. When a sensor in a remote data centre starts reporting 52°C, you want to know immediately, not when someone notices the hardware has throttled.
This post walks through an IoT telemetry pipeline I built on AWS: a device simulator that generates realistic sensor data, a Kinesis stream to buffer it, a Lambda consumer that writes every reading to DynamoDB, and an alert handler that fires SNS notifications and CloudWatch metrics the moment a reading breaches a threshold.
Source code: github.com/tsekatm/iot-kinesis-stream
Architecture
Device Simulator (Python)
└── put_record → Kinesis Data Stream
│
▼
Lambda: Kinesis Consumer
│ (decode → validate → write)
│
├──► DynamoDB (iot-sensor-readings)
│ PK=device_id, SK=timestamp
│
└──► Lambda: Alert Handler (async invoke)
│
├──► SNS Topic → Email / SMS
└──► CloudWatch
IoTPipeline/Alerts :: AnomalyCount
Three components. Each one does exactly one thing.
In a telecoms context, these sensors monitor cabinet temperatures at cell tower sites, humidity in outdoor street-cabinet enclosures, and power supply metrics at edge nodes. The same pipeline pattern applies equally to network KPIs — signal strength, packet loss, handover failure rates — anywhere you need a low-latency path from raw telemetry to an ops team notification.
Step 1: Device Simulator
The simulator generates realistic sensor readings and publishes them to Kinesis. Normal readings stay within plausible operating ranges; a configurable anomaly probability injects spikes for demo and testing purposes.
# src/device_simulator.py
TEMP_NORMAL = (15.0, 35.0)
TEMP_SPIKE = (46.0, 60.0) # above the 45°C alert threshold
HUMIDITY_NORMAL = (30.0, 80.0)
ANOMALY_PROBABILITY = 0.05 # 5% of readings are anomalies
def generate_telemetry(device_id: str, anomaly: bool = False) -> dict:
if anomaly or random.random() < ANOMALY_PROBABILITY:
temperature = round(random.uniform(*TEMP_SPIKE), 2)
humidity_range = random.choice([HUMIDITY_HIGH_SPIKE, HUMIDITY_LOW_SPIKE])
humidity = round(random.uniform(*humidity_range), 2)
else:
temperature = round(random.uniform(*TEMP_NORMAL), 2)
humidity = round(random.uniform(*HUMIDITY_NORMAL), 2)
return {
"device_id": device_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"temperature": temperature,
"humidity": humidity,
"pressure": round(random.uniform(950.0, 1050.0), 2),
"firmware_version": "1.2.0",
}
def publish_telemetry(client, device_id: str, payload: dict) -> None:
client.put_record(
StreamName=KINESIS_STREAM_NAME,
Data=json.dumps(payload).encode("utf-8"),
PartitionKey=device_id, # routes same device to same shard
)
Using device_id as the Kinesis partition key means all readings from the same device land in the same shard and are consumed in order. This matters if you later want to add rate-of-change anomaly detection — you need time-ordered records per device.
Step 2: Kinesis Consumer Lambda
The Decimal gotcha
DynamoDB does not accept Python float. If you try to write {"temperature": 23.5}, you'll get a TypeError: Float types are not supported. The fix is Decimal, and the safest way to convert is Decimal(str(value)) — not Decimal(value) which inherits floating-point imprecision:
from decimal import Decimal
item = {
"device_id": payload["device_id"],
"timestamp": payload["timestamp"],
"temperature": Decimal(str(payload["temperature"])),
"humidity": Decimal(str(payload["humidity"])),
"pressure": Decimal(str(payload["pressure"])),
"ingestion_timestamp": datetime.now(timezone.utc).isoformat(),
}
table.put_item(Item=item)
Partial batch responses
If one record fails, only that record should be retried — not the whole batch of 100. Kinesis supports this via partial batch responses:
def handler(event, context):
batch_item_failures = []
for record in event.get("Records", []):
try:
payload = decode_record(record)
validate_payload(payload)
write_to_dynamodb(payload)
check_and_invoke_alerts(payload)
except Exception as e:
seq = record["kinesis"]["sequenceNumber"]
batch_item_failures.append({"itemIdentifier": seq})
return {"batchItemFailures": batch_item_failures}
Async alert invocation
When a reading breaches a threshold, the consumer invokes the alert handler Lambda asynchronously (InvocationType="Event"). Fire-and-forget — one slow SNS publish can't back up the Kinesis stream:
def check_and_invoke_alerts(payload: dict) -> None:
temp, humidity = payload.get("temperature", 0), payload.get("humidity", 50)
if temp > TEMP_THRESHOLD or humidity > HUMIDITY_HIGH or humidity < HUMIDITY_LOW:
lambda_client.invoke(
FunctionName=ALERT_FUNCTION_NAME,
InvocationType="Event",
Payload=json.dumps(payload).encode("utf-8"),
)
Step 3: Alert Handler Lambda
def detect_anomalies(payload: dict) -> list:
anomalies = []
temp = payload.get("temperature")
if temp is not None and temp > TEMP_THRESHOLD:
anomalies.append({
"field": "temperature", "value": float(temp),
"threshold": TEMP_THRESHOLD, "direction": "above",
})
humidity = payload.get("humidity")
if humidity is not None:
if humidity > HUMIDITY_HIGH_THRESHOLD:
anomalies.append({"field": "humidity", "value": float(humidity),
"threshold": HUMIDITY_HIGH_THRESHOLD, "direction": "above"})
elif humidity < HUMIDITY_LOW_THRESHOLD:
anomalies.append({"field": "humidity", "value": float(humidity),
"threshold": HUMIDITY_LOW_THRESHOLD, "direction": "below"})
return anomalies
CloudWatch metric in a custom namespace so it doesn't get buried in Lambda system metrics:
def publish_cloudwatch_metric(payload: dict, anomalies: list) -> None:
cloudwatch_client.put_metric_data(
Namespace="IoTPipeline/Alerts",
MetricData=[{
"MetricName": "AnomalyCount",
"Dimensions": [{"Name": "DeviceId", "Value": payload["device_id"]}],
"Value": float(len(anomalies)),
"Unit": "Count",
"Timestamp": datetime.now(timezone.utc),
}],
)
Set a CloudWatch alarm on AnomalyCount > 5 per device per hour and page on-call automatically.
Test Suite: 30/30
All tests written before implementation (TDD).
tests/test_device_simulator.py 9 passed
tests/test_alert_handler.py 9 passed
tests/test_lambda_consumer.py 12 passed
─────────────────────────────────────────
Total 30 passed
Key tests:
- Temperature/humidity/pressure stay within realistic ranges across 100 random readings
- Anomaly injection raises temperature above the normal maximum
-
put_recordcalled withPartitionKey=device_id - Decimal conversion verified in DynamoDB write
- High temp, high humidity, low humidity each detected with correct
direction - CloudWatch namespace is
IoTPipeline/Alerts, metric value equals anomaly count - Partial batch failure itemIdentifiers include only the failed sequence number
Infrastructure (Terraform)
resource "aws_kinesis_stream" "iot_stream" {
name = "iot-telemetry-stream"
shard_count = 1
retention_period = 24
}
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
event_source_arn = aws_kinesis_stream.iot_stream.arn
function_name = aws_lambda_function.consumer.arn
starting_position = "LATEST"
batch_size = 100
bisect_batch_on_function_error = true
function_response_types = ["ReportBatchItemFailures"]
}
resource "aws_dynamodb_table" "sensor_readings" {
name = "iot-sensor-readings"
billing_mode = "PAY_PER_REQUEST"
hash_key = "device_id"
range_key = "timestamp"
}
bisect_batch_on_function_error paired with ReportBatchItemFailures gives fine-grained retry — Kinesis bisects failing batches until it isolates the exact bad record.
What I'd Add Next
- Rate-of-change detection — alert when temperature increases >5°C in 60 seconds, not just on absolute threshold
- Dead letter queue — SQS DLQ for alert handler failures so no alert is silently dropped
- Kinesis Data Analytics — SQL tumbling window aggregations for fleet-wide statistics
- Device shadow — track last-known state per device for context-aware alerting
Tebogo Tseka — Cloud Solutions Architect & ML Engineer
GitHub: @tsekatm | Blog: tebogosacloud.blog
Top comments (0)