Let’s imagine we are building a system to handle continuous streams of IoT sensor data, such as temperature, humidity, and more. In this blog, I will create a data pipeline using AWS Kinesis Data Streams and AWS Lambda.
For demonstration purposes, I’ll simulate IoT sensor data using a producer Lambda, which will act as our IoT device and push sensor data into the Kinesis stream. On the other end, a consumer Lambda will process the incoming events in real time, logging them to CloudWatch Logs.
To ensure no data is lost, any failed events will be redirected to an SQS Dead Letter Queue (DLQ) for later analysis.
Step 1: Create the Kinesis Stream:
In the AWS Management Console, navigate to Kinesis and select Data streams. Click Create, then provide a name for your stream (for example, iot-sensor-stream). Choose On-demand as the capacity mode or Provisioned if you can predict the amount of data you will be streaming. Finally, click Create to set up the data stream.
Step 2: Create a Producer Lambda that simulates IoT sensors
The code for the Lambda Function:
import os
import json
import boto3
import random
import time
# Initialize Kinesis client
kinesis = boto3.client('kinesis')
# Stream ARN is stored in Parameter Store
ssm = boto3.client('ssm')
stream_arn = ssm.get_parameter(Name="/kinesis/iot_sensor_stream_arn")['Parameter']['Value']
stream_name = stream_arn.split("/")[-1] # extract stream name from ARN
def lambda_handler(event, context):
# Simulate IoT sensor readings
sensor_data = {
"sensor_id": f"sensor-{random.randint(1, 5)}",
"temperature": round(random.uniform(18.0, 30.0), 2),
"humidity": round(random.uniform(30.0, 60.0), 2),
"timestamp": int(time.time())
}
# Put record into Kinesis
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(sensor_data),
PartitionKey=sensor_data["sensor_id"]
)
print(f"Produced sensor data: {sensor_data}")
return {"statusCode": 200, "body": json.dumps(sensor_data)}
We need to attach IAM role to this Lambda with the following permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:<region>:<account-id>:stream/iot-sensor-stream"
},
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:GetParameters"
],
"Resource": "arn:aws:ssm:<region>:<account-id>:parameter/<parameter-name>"
}
]
}
We need to replace 'region' with our AWS region and also replace 'account-id' with our AWS account ID. Finally, replace 'parameter-name' with the name of the SSM parameter our Lambda needs to access. This provides Lambda permission to put records into a specific Kinesis stream and read parameters from SSM Parameter Store.
Step 3: Create Consumer Lambda that will process IoT data
Code for the consumer Data:
import base64
import json
def lambda_handler(event, context):
for record in event['Records']:
# Kinesis data is base64 encoded
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
sensor_data = json.loads(payload)
# For demo: just log it
print(f"Consumed sensor data: {sensor_data}")
# Example future processing:
# - store in DynamoDB
# - send alerts if thresholds exceeded
# - forward to analytics pipeline
return {"statusCode": 200}
We need to attach AWSLambdaKinesisExecutionRole and sqs:SendMessage permissions to this Lambda.
Step 4: Connecting Kinesis Stream to the Consumer Lambda
In the Lambda console, we need to go to consumer-lambda. In the configuration section, we will find Triggers and add Kinesis as a trigger. Our trigger is named iot-sensor-stream.
We will keep the Starting position as latest and under on failure settings , we will select the SQS DLQ and put the ARN of iot-sensor-dlq.
Step 5: Monitoring
Producer Monitoring
We can go to CloudWatch Logs and open the log group /aws/lambda/lambda-producer. Verify that the logs show entries like:
Produced sensor data:
{'sensor_id': 'sensor-3', 'temperature': 24.1, ...}
Next, in the Kinesis console, we can navigate to our data stream and open the Monitoring tab. Watch metrics such as IncomingRecords and PutRecord. Success to ensure that the producer is sending data successfully.
Consumer Monitoring
For the consumer, we can go to CloudWatch Logs → /aws/lambda/lambda-consumer and verify that entries like the following appear:
Consumed sensor data:
{'sensor_id': 'sensor-3', 'temperature': 24.1, ...}
In the Kinesis console’s Monitoring tab, we can check metrics such as GetRecords.Success and IteratorAgeMilliseconds. These helps us ensure that the consumer is processing records in near real time.
DLQ Monitoring
Finally, in the SQS console, we can open our dead letter queue (iot-sensor-dlq) and check the Approximate number of messages. If the consumer fails (for example, if you intentionally break JSON parsing in the code), failed batches will appear here for later inspection.
So with this setup we have a working IoT sensor streaming pipeline with a Producer Lambda (IoT simulator) which will use Kinesis Stream for buffering and a Consumer Lambda will process this data stream. We also have provisioned SQS DLQ for failures and have integrated CloudWatch and Kinesis metrics for monitoring.





Top comments (0)