Introduction
AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.
1. Create an initial lambda function
Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.
2. Create additional scripts
I have created a Python script to create a S3 bucket and upload the Excel file into it.
create_and_upload.py
import boto3
import botocore
import os
def create_s3_bucket(bucket_name, region=None):
    s3_client = boto3.client('s3', region_name=region)
    try:
        if region is None:
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
    except botocore.exceptions.ClientError as e:
        print(f"Error occurred while creating bucket: {e}")
        return False
    return True
def upload_file_to_s3(file_name, bucket_name, object_name=None):
    # Check if the file exists
    if not os.path.exists(file_name):
        print(f"Error: The file {file_name} does not exist.")
        return False
    if object_name is None:
        object_name = os.path.basename(file_name)  # Extracts just the file name from the full file path
    s3_client = boto3.client('s3')
    try:
        s3_client.upload_file(file_name, bucket_name, object_name)
        print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'")
        return True
    except Exception as e:
        print(f"Error occurred while uploading file: {str(e)}")
        return False
def main():
    bucket_name = 's3-bucket-name'  # Replace with your unique bucket name
    region = 'region-name'  # Replace with your desired region
    excel_file_path = r'excel-file-path.xlsx'  # Replace with your local Excel file path
    # Create S3 bucket
    if create_s3_bucket(bucket_name, region):
        print(f"Bucket '{bucket_name}' created successfully.")
    # Upload file to S3
    if upload_file_to_s3(excel_file_path, bucket_name):
        print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.")
if __name__ == '__main__':
    main()
3. Write the lambda function
3.1 File Structure
CSVTimestream
      |
      |--events
      |     |---event.json
      |
      |--timestream
      |     |---app.py
      |
      |---samconfig.toml
      |---template.yaml
My template.yaml file will be the as follows.
3.2 Codes
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  CSVTimestream
  Sample SAM Template for CSVTimestream
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 300
    MemorySize: 128
Resources:
  TimestreamLambdaFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: timestream/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Role: !GetAtt LambdaExecutionRole.Arn
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: [ lambda.amazonaws.com ]
            Action: [ 'sts:AssumeRole' ]
      Policies:
        - PolicyName: TimestreamAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [ 'timestream:*' ]
                Resource: '*'
        - PolicyName: S3BucketAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [ 's3:GetObject' ]
                Resource: '*'
My lambda function will be the as follows.
app.py
import boto3
import pandas as pd
from botocore.exceptions import ClientError
from io import BytesIO
# Initialize clients
s3_client = boto3.client('s3')
timestream_write = boto3.client('timestream-write')
# Constants
database_name = 'timestream-db-name'
BUCKET_NAME = 's3-bucket-name'
OBJECT_KEY = 'excel-file-name.xlsx'
def create_database(database_name):
    try:
        timestream_write.create_database(DatabaseName=database_name)
        print(f"Database {database_name} created.")
    except ClientError as e:
        print(f"Database creation failed: {e}")
def create_table(table_name):
    try:
        retention_properties = {
            'MemoryStoreRetentionPeriodInHours': 24,
            'MagneticStoreRetentionPeriodInDays': 7
        }
        timestream_write.create_table(DatabaseName=database_name, TableName=table_name,
                            RetentionProperties=retention_properties)
        print(f"Table {table_name} created.")
    except ClientError as e:
        print(f"Table creation failed: {e}")
def get_excel_file(bucket, key):
    s3_client = boto3.client('s3')
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return BytesIO(response['Body'].read())
def process_excel_file(excel_file):
    # Read the Excel file
    xls = pd.ExcelFile(excel_file)
    # Process each sheet in the Excel file
    for sheet_name in xls.sheet_names:
        df = pd.read_excel(xls, sheet_name=sheet_name)
        # Create a table for each sheet
        create_table(sheet_name)
        # Write records to Timestream
        write_records(df, sheet_name)
def write_records(df, table_name):
    records = []
    version = 1  # Start with a base version number
    for index, row in df.iterrows():
        time_str = row['time'].replace('"', '')
        time_dt = pd.to_datetime(time_str)
        timestamp_ms = int(time_dt.timestamp() * 1000)
        # measure_value = row['measure_value::double']
        # Build the list of dimensions.
        dimensions = [
            {'Name': 'col_1_name', 'Value': str(row['col_1_name'])},
            {'Name': 'col_2_name', 'Value': str(row['col_2_name'])}
.
.
.#continue this based on your Excel file columns
        ]
        # Include additional dimensions based on the sheet structure.
        if 'addi_col' in df.columns:
            dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])})
        record = {
            'Dimensions': dimensions,
            'MeasureName': row['col_name'],
            'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file
            'MeasureValueType': 'DOUBLE',
            'Time': str(timestamp_ms),
            'Version': version  # Adding a version number
        }
        records.append(record)
        version += 1  # Increment the version for each record
    try:
        result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name,
                                      Records=records, CommonAttributes={})
        print(
            f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}")
    except timestream_write.exceptions.RejectedRecordsException as e:
        print("Error writing records:", e)
        for rejected_record in e.response['RejectedRecords']:
            print("Rejected Record:", rejected_record)
    except ClientError as e:
        print(f"Error writing records: {e}")
def lambda_handler(event, context):
    # Create the Timestream database
    create_database(database_name)
    # Get the Excel file from S3
    excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY)
    # Process the Excel file
    process_excel_file(excel_file)
    return {
        'statusCode': 200,
        'body': "Data loaded to Timestream successfully."
    }
samconfig.toml will be as follows.
samconfig.toml
# More information about the configuration file can be found here:
# https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
version = 0.1
[default]
[default.global.parameters]
stack_name = "CSVTimestream"
[default.build.parameters]
cached = true
parallel = true
[default.validate.parameters]
lint = true
[default.deploy.parameters]
capabilities = "CAPABILITY_IAM"
confirm_changeset = true
resolve_s3 = true
s3_prefix = "CSVTimestream"
region = "aws-region"
image_repositories = []
[default.package.parameters]
resolve_s3 = true
[default.sync.parameters]
watch = true
[default.local_start_api.parameters]
warm_containers = "EAGER"
[default.local_start_lambda.parameters]
warm_containers = "EAGER"
4. Finally
Deploy the lambda function and test it using the local invoke command. You'll see the Timestream DB has been created and its tables with data. 
 


 
    
Top comments (0)