DEV Community

Mohammed Aathil
Mohammed Aathil

Posted on

AWS Lambda functions to process data from Amazon Kinesis streams

AWS Lambda is a serverless computing service that runs your code in response to events and automatically manages the underlying compute resources for you. One use case for AWS Lambda is to process data from Amazon Kinesis streams in real-time.

To get started, you'll need to create an AWS account and set up an IAM role with permissions to access Kinesis streams and Lambda functions.

Once you have these prerequisites in place, you can create a new Lambda function and select "Kinesis" as the trigger. You'll then be asked to specify the Kinesis stream and the batch size for the data that will be processed by your Lambda function.

Here is an example of a simple Python function that processes data from a Kinesis stream and logs the data to Amazon CloudWatch

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        logger.info(payload)

Enter fullscreen mode Exit fullscreen mode

This function iterates over the records in the event object and parses the JSON data contained in each record. It then logs the data to CloudWatch using the logger.info method.

To test this function, you can use the AWS Lambda console to create a test event and specify the sample data that you want to use. You can then invoke the function and view the logs in CloudWatch to see the processed data.

Here is an example of the output that you might see in CloudWatch:

{'timestamp': 1623456789, 'message': 'Hello, World!'}
{'timestamp': 1623456790, 'message': 'Hello, Again!'}

Enter fullscreen mode Exit fullscreen mode

Here is an example of a Python function that processes data from a Kinesis stream and logs the data to Amazon CloudWatch. This function demonstrates how to iterate over records in the event object, parse the JSON data contained in each record, and log the data to CloudWatch using the logger.info method:

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        logger.info(payload)

Enter fullscreen mode Exit fullscreen mode

To test this function, you can use the AWS Lambda console to create a test event and specify the sample data that you want to use. You can then invoke the function and view the logs in CloudWatch to see the processed data.

Here is an example of the output that you might see in CloudWatch:

{'timestamp': 1623456789, 'message': 'Hello, World!'}
{'timestamp': 1623456790, 'message': 'Hello, Again!'}

Enter fullscreen mode Exit fullscreen mode

In addition to logging the data, you may also want to do some additional processing on the data before storing it in a database or triggering other events. Here is an example of how you could modify the above function to filter the data and store it in a DynamoDB table:

import json
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-table')

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        if payload['value'] > 50:
            table.put_item(Item=payload)
            logger.info(f'Stored item: {payload}')

Enter fullscreen mode Exit fullscreen mode

This modified function filters the data based on a specific condition (in this case, the value must be greater than 50) and stores the filtered data in a DynamoDB table. It also logs a message to CloudWatch indicating that the item was stored.

By using AWS Lambda and Amazon Kinesis together, you can easily process streaming data in real-time without having to worry about the underlying infrastructure. This makes it easy to build scalable and reliable applications that can process large volumes of data quickly and efficiently

Explanation of how to use AWS Lambda and Amazon Kinesis to process streaming data

AWS Lambda is a serverless computing service that runs your code in response to events and automatically manages the underlying compute resources for you. One use case for AWS Lambda is to process data from Amazon Kinesis streams in real-time.

To get started, you'll need to create an AWS account and set up an IAM role with permissions to access Kinesis streams and Lambda functions. You can do this using the AWS Management Console or the AWS CLI.

Once you have these prerequisites in place, you can create a new Lambda function using the AWS Management Console or the AWS CLI. When creating the function, you'll need to specify the runtime (e.g. Python, Node.js, Java, etc.) and the trigger. In this case, you'll select "Kinesis" as the trigger and specify the stream and batch size for the data that will be processed by your Lambda function.

Here is an example of a Python function that processes data from a Kinesis stream and logs the data to Amazon CloudWatch

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        logger.info(payload)

Enter fullscreen mode Exit fullscreen mode

This function iterates over the records in the event object, which is passed to the function as an argument. The event object contains metadata about the event that triggered the function and a list of records that were sent to the stream.

For each record in the event object, the function parses the JSON data contained in the data field of the kinesis attribute. It then logs the data to CloudWatch using the logger.info method.

To test this function, you can use the AWS Management Console or the AWS CLI to create a test event and specify the sample data that you want to use. You can then invoke the function using the AWS Management Console, the AWS CLI, or the AWS SDK.

To view the logs generated by the function, you can use the AWS Management Console or the AWS CLI to access the CloudWatch Logs service. From there, you can view the log streams for your Lambda function and see the logged data.

In addition to logging the data, you may also want to do some additional processing on the data before storing it in a database or triggering other events. Here is an example of how you could modify the above function to filter the data and store it in a DynamoDB table:

import json
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-table')

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        if payload['value'] > 50:
            table.put_item(Item=payload)
            logger.info(f'Stored item: {payload}')

Enter fullscreen mode Exit fullscreen mode

In conclusion, AWS Lambda and Amazon Kinesis are powerful tools for processing streaming data in real-time. By using these services together, you can easily build scalable and reliable applications that can process large volumes of data quickly and efficiently, without having to worry about the underlying infrastructure.

To get started, you'll need to create an AWS account and set up an IAM role with the necessary permissions. You can then create a new Lambda function and specify the Kinesis stream as the trigger. In the function code, you can iterate over the records in the event object, parse the data contained in each record, and perform any necessary processing or storage tasks.

By using code and implementation details, you can customize the processing logic to fit your specific requirements and build sophisticated applications that can handle complex data processing tasks in real-time.

I hope this summary helps to provide a clear understanding of how to use AWS Lambda and Amazon Kinesis to process streaming data. Let me know if you have any further questions!

Top comments (0)