DEV Community

Tebogo Tseka
Tebogo Tseka

Posted on

Building a Real-Time IoT Telemetry Pipeline with Kinesis, Lambda, and DynamoDB

Every telecoms network has thousands of physical devices — routers, base stations, environmental sensors in data centres — quietly reporting readings every few seconds. When a sensor in a remote data centre starts reporting 52°C, you want to know immediately, not when someone notices the hardware has throttled.

This post walks through an IoT telemetry pipeline I built on AWS: a device simulator that generates realistic sensor data, a Kinesis stream to buffer it, a Lambda consumer that writes every reading to DynamoDB, and an alert handler that fires SNS notifications and CloudWatch metrics the moment a reading breaches a threshold.

Source code: github.com/tsekatm/iot-kinesis-stream


Architecture

Device Simulator (Python)
  └── put_record → Kinesis Data Stream
                         │
                         ▼
              Lambda: Kinesis Consumer
                │  (decode → validate → write)
                │
                ├──► DynamoDB (iot-sensor-readings)
                │    PK=device_id, SK=timestamp
                │
                └──► Lambda: Alert Handler (async invoke)
                              │
                              ├──► SNS Topic → Email / SMS
                              └──► CloudWatch
                                   IoTPipeline/Alerts :: AnomalyCount
Enter fullscreen mode Exit fullscreen mode

Three components. Each one does exactly one thing.

In a telecoms context, these sensors monitor cabinet temperatures at cell tower sites, humidity in outdoor street-cabinet enclosures, and power supply metrics at edge nodes. The same pipeline pattern applies equally to network KPIs — signal strength, packet loss, handover failure rates — anywhere you need a low-latency path from raw telemetry to an ops team notification.


Step 1: Device Simulator

The simulator generates realistic sensor readings and publishes them to Kinesis. Normal readings stay within plausible operating ranges; a configurable anomaly probability injects spikes for demo and testing purposes.

# src/device_simulator.py

TEMP_NORMAL   = (15.0, 35.0)
TEMP_SPIKE    = (46.0, 60.0)    # above the 45°C alert threshold
HUMIDITY_NORMAL = (30.0, 80.0)
ANOMALY_PROBABILITY = 0.05      # 5% of readings are anomalies

def generate_telemetry(device_id: str, anomaly: bool = False) -> dict:
    if anomaly or random.random() < ANOMALY_PROBABILITY:
        temperature = round(random.uniform(*TEMP_SPIKE), 2)
        humidity_range = random.choice([HUMIDITY_HIGH_SPIKE, HUMIDITY_LOW_SPIKE])
        humidity = round(random.uniform(*humidity_range), 2)
    else:
        temperature = round(random.uniform(*TEMP_NORMAL), 2)
        humidity = round(random.uniform(*HUMIDITY_NORMAL), 2)

    return {
        "device_id": device_id,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "temperature": temperature,
        "humidity": humidity,
        "pressure": round(random.uniform(950.0, 1050.0), 2),
        "firmware_version": "1.2.0",
    }

def publish_telemetry(client, device_id: str, payload: dict) -> None:
    client.put_record(
        StreamName=KINESIS_STREAM_NAME,
        Data=json.dumps(payload).encode("utf-8"),
        PartitionKey=device_id,    # routes same device to same shard
    )
Enter fullscreen mode Exit fullscreen mode

Using device_id as the Kinesis partition key means all readings from the same device land in the same shard and are consumed in order. This matters if you later want to add rate-of-change anomaly detection — you need time-ordered records per device.


Step 2: Kinesis Consumer Lambda

The Decimal gotcha

DynamoDB does not accept Python float. If you try to write {"temperature": 23.5}, you'll get a TypeError: Float types are not supported. The fix is Decimal, and the safest way to convert is Decimal(str(value)) — not Decimal(value) which inherits floating-point imprecision:

from decimal import Decimal

item = {
    "device_id":   payload["device_id"],
    "timestamp":   payload["timestamp"],
    "temperature": Decimal(str(payload["temperature"])),
    "humidity":    Decimal(str(payload["humidity"])),
    "pressure":    Decimal(str(payload["pressure"])),
    "ingestion_timestamp": datetime.now(timezone.utc).isoformat(),
}
table.put_item(Item=item)
Enter fullscreen mode Exit fullscreen mode

Partial batch responses

If one record fails, only that record should be retried — not the whole batch of 100. Kinesis supports this via partial batch responses:

def handler(event, context):
    batch_item_failures = []
    for record in event.get("Records", []):
        try:
            payload = decode_record(record)
            validate_payload(payload)
            write_to_dynamodb(payload)
            check_and_invoke_alerts(payload)
        except Exception as e:
            seq = record["kinesis"]["sequenceNumber"]
            batch_item_failures.append({"itemIdentifier": seq})

    return {"batchItemFailures": batch_item_failures}
Enter fullscreen mode Exit fullscreen mode

Async alert invocation

When a reading breaches a threshold, the consumer invokes the alert handler Lambda asynchronously (InvocationType="Event"). Fire-and-forget — one slow SNS publish can't back up the Kinesis stream:

def check_and_invoke_alerts(payload: dict) -> None:
    temp, humidity = payload.get("temperature", 0), payload.get("humidity", 50)

    if temp > TEMP_THRESHOLD or humidity > HUMIDITY_HIGH or humidity < HUMIDITY_LOW:
        lambda_client.invoke(
            FunctionName=ALERT_FUNCTION_NAME,
            InvocationType="Event",
            Payload=json.dumps(payload).encode("utf-8"),
        )
Enter fullscreen mode Exit fullscreen mode

Step 3: Alert Handler Lambda

def detect_anomalies(payload: dict) -> list:
    anomalies = []

    temp = payload.get("temperature")
    if temp is not None and temp > TEMP_THRESHOLD:
        anomalies.append({
            "field": "temperature", "value": float(temp),
            "threshold": TEMP_THRESHOLD, "direction": "above",
        })

    humidity = payload.get("humidity")
    if humidity is not None:
        if humidity > HUMIDITY_HIGH_THRESHOLD:
            anomalies.append({"field": "humidity", "value": float(humidity),
                               "threshold": HUMIDITY_HIGH_THRESHOLD, "direction": "above"})
        elif humidity < HUMIDITY_LOW_THRESHOLD:
            anomalies.append({"field": "humidity", "value": float(humidity),
                               "threshold": HUMIDITY_LOW_THRESHOLD, "direction": "below"})

    return anomalies
Enter fullscreen mode Exit fullscreen mode

CloudWatch metric in a custom namespace so it doesn't get buried in Lambda system metrics:

def publish_cloudwatch_metric(payload: dict, anomalies: list) -> None:
    cloudwatch_client.put_metric_data(
        Namespace="IoTPipeline/Alerts",
        MetricData=[{
            "MetricName": "AnomalyCount",
            "Dimensions": [{"Name": "DeviceId", "Value": payload["device_id"]}],
            "Value": float(len(anomalies)),
            "Unit": "Count",
            "Timestamp": datetime.now(timezone.utc),
        }],
    )
Enter fullscreen mode Exit fullscreen mode

Set a CloudWatch alarm on AnomalyCount > 5 per device per hour and page on-call automatically.


Test Suite: 30/30

All tests written before implementation (TDD).

tests/test_device_simulator.py     9 passed
tests/test_alert_handler.py        9 passed
tests/test_lambda_consumer.py     12 passed
─────────────────────────────────────────
Total                             30 passed
Enter fullscreen mode Exit fullscreen mode

Key tests:

  • Temperature/humidity/pressure stay within realistic ranges across 100 random readings
  • Anomaly injection raises temperature above the normal maximum
  • put_record called with PartitionKey=device_id
  • Decimal conversion verified in DynamoDB write
  • High temp, high humidity, low humidity each detected with correct direction
  • CloudWatch namespace is IoTPipeline/Alerts, metric value equals anomaly count
  • Partial batch failure itemIdentifiers include only the failed sequence number

Infrastructure (Terraform)

resource "aws_kinesis_stream" "iot_stream" {
  name             = "iot-telemetry-stream"
  shard_count      = 1
  retention_period = 24
}

resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
  event_source_arn               = aws_kinesis_stream.iot_stream.arn
  function_name                  = aws_lambda_function.consumer.arn
  starting_position              = "LATEST"
  batch_size                     = 100
  bisect_batch_on_function_error = true
  function_response_types        = ["ReportBatchItemFailures"]
}

resource "aws_dynamodb_table" "sensor_readings" {
  name         = "iot-sensor-readings"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "device_id"
  range_key    = "timestamp"
}
Enter fullscreen mode Exit fullscreen mode

bisect_batch_on_function_error paired with ReportBatchItemFailures gives fine-grained retry — Kinesis bisects failing batches until it isolates the exact bad record.


What I'd Add Next

  • Rate-of-change detection — alert when temperature increases >5°C in 60 seconds, not just on absolute threshold
  • Dead letter queue — SQS DLQ for alert handler failures so no alert is silently dropped
  • Kinesis Data Analytics — SQL tumbling window aggregations for fleet-wide statistics
  • Device shadow — track last-known state per device for context-aware alerting

Tebogo Tseka — Cloud Solutions Architect & ML Engineer
GitHub: @tsekatm | Blog: tebogosacloud.blog

Top comments (0)