DEV Community

Cover image for Efficient Batch Writing to DynamoDB with Python: A Step-by-Step Guide
Dmitry Romanoff
Dmitry Romanoff

Posted on

Efficient Batch Writing to DynamoDB with Python: A Step-by-Step Guide

When working with AWS DynamoDB, especially for applications that need to handle large volumes of data, efficient record insertion is crucial. In this post, we'll walk through a Python script that demonstrates how to:

  1. Check if a DynamoDB table exists and create one if it doesn't.
  2. Generate random data for the table.
  3. Batch-write data into DynamoDB to improve performance and reduce costs.

We'll be using the boto3 library to interact with DynamoDB, so make sure you have it installed before proceeding.

pip install boto3
Enter fullscreen mode Exit fullscreen mode

1. Setting Up the DynamoDB Table

First, we initialize a session with AWS using boto3 and specify the region for DynamoDB:

import boto3
from botocore.exceptions import ClientError

# Initialize a session using AWS
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')  # Specify the region

# Specify your DynamoDB table name
table_name = 'My_DynamoDB_Table_Name'
Enter fullscreen mode Exit fullscreen mode

Next, we define a function create_table_if_not_exists() to check if the table exists. If it doesn't, the function creates it. In this example, the table is created with a simple partition key (id).

def create_table_if_not_exists():
    try:
        table = dynamodb.Table(table_name)
        table.load()  # Attempt to load the table metadata
        print(f"Table '{table_name}' already exists.")
        return table
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"Table '{table_name}' not found. Creating a new table...")
            table = dynamodb.create_table(
                TableName=table_name,
                KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],  # Partition key
                AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],  # String type
                ProvisionedThroughput={'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
            )
            # Wait for the table to be created
            table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
            print(f"Table '{table_name}' created successfully.")
            return table
        else:
            print(f"Error checking or creating the table: {e}")
            raise
Enter fullscreen mode Exit fullscreen mode

2. Generating Random Data

For this example, we'll generate random records with an id, name, timestamp, and value. The id will be a random 16-character string, while the value will be a random integer between 1 and 1000.

import random
import string
from datetime import datetime

# Function to generate random string
def generate_random_string(length=10):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

# Function to generate random record
def generate_record():
    return {
        'id': generate_random_string(16),  # Unique id for the record
        'name': generate_random_string(8),  # Random name
        'timestamp': str(datetime.utcnow()),  # Timestamp for the record
        'value': random.randint(1, 1000),  # Some random value
    }
Enter fullscreen mode Exit fullscreen mode

3. Batch Writing Data

Now, instead of writing records one-by-one, which can be slow and inefficient, we'll use DynamoDB's batch_writer() to write records in batches. This method allows us to insert up to 25 records in a single batch.

# Function to batch write records
def batch_write(table, records):
    with table.batch_writer() as batch:
        for record in records:
            batch.put_item(Item=record)
Enter fullscreen mode Exit fullscreen mode

4. Main Workflow

Now that we have the functions to create the table and generate records, we can define the main workflow. This will:

  1. Create the table if it doesn't already exist.
  2. Generate 1000 random records.
  3. Write them to DynamoDB in batches of 25.
def main():
    # Create the table if it doesn't exist
    table = create_table_if_not_exists()

    records_batch = []
    for i in range(1, 1001):  # Loop to create 1000 records
        record = generate_record()
        records_batch.append(record)

        # If batch size reaches 25 items, write to DynamoDB and reset
        if len(records_batch) == 25:
            batch_write(table, records_batch)
            records_batch = []
            print(f"Written {i} records")

    # Write any remaining records
    if records_batch:
        batch_write(table, records_batch)
        print(f"Written remaining {len(records_batch)} records")

if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

5. Summary

By using batch_writer(), we significantly improve the efficiency of writing large volumes of data to DynamoDB. Here's a quick recap of the key steps:

  1. Create the DynamoDB table if it doesn't exist.
  2. Generate random data for testing.
  3. Batch write up to 25 records at a time.

This script helps you automate the process of writing large datasets to DynamoDB and makes your application more efficient.

import boto3
import random
import string
from datetime import datetime
from botocore.exceptions import ClientError

# Initialize a session using AWS
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')  # Specify the region

# Specify your DynamoDB table name
table_name = 'My_DynamoDB_Table_Name'

# Check if the table exists, and if not, create it
def create_table_if_not_exists():
    try:
        # Check if the table exists
        table = dynamodb.Table(table_name)
        table.load()  # Attempt to load the table metadata
        print(f"Table '{table_name}' already exists.")
        return table
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"Table '{table_name}' not found. Creating a new table...")
            # Create a new table
            table = dynamodb.create_table(
                TableName=table_name,
                KeySchema=[
                    {
                        'AttributeName': 'id',
                        'KeyType': 'HASH'  # Partition key
                    },
                ],
                AttributeDefinitions=[
                    {
                        'AttributeName': 'id',
                        'AttributeType': 'S'  # String type
                    },
                ],
                ProvisionedThroughput={
                    'ReadCapacityUnits': 5,
                    'WriteCapacityUnits': 5
                }
            )
            # Wait for the table to be created
            table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
            print(f"Table '{table_name}' created successfully.")
            return table
        else:
            print(f"Error checking or creating the table: {e}")
            raise

# Function to generate random string
def generate_random_string(length=10):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

# Function to generate random record
def generate_record():
    return {
        'id': generate_random_string(16),  # Unique id for the record
        'name': generate_random_string(8),  # Random name
        'timestamp': str(datetime.utcnow()),  # Timestamp for the record
        'value': random.randint(1, 1000),  # Some random value
    }

# Function to batch write records
def batch_write(table, records):
    with table.batch_writer() as batch:
        for record in records:
            batch.put_item(Item=record)

def main():
    # Create the table if it doesn't exist
    table = create_table_if_not_exists()

    records_batch = []
    for i in range(1, 1001):  # Loop to create 1000 records
        record = generate_record()
        records_batch.append(record)

        # If batch size reaches 25 items, write to DynamoDB and reset
        if len(records_batch) == 25:
            batch_write(table, records_batch)
            records_batch = []
            print(f"Written {i} records")

    # Write any remaining records
    if records_batch:
        batch_write(table, records_batch)
        print(f"Written remaining {len(records_batch)} records")

if __name__ == '__main__':
    main()

Enter fullscreen mode Exit fullscreen mode

Conclusion

Handling large-scale data ingestion into DynamoDB can be tricky, but using the right techniques—like checking for table existence, generating data dynamically, and writing in batches—can make the process seamless and efficient. Feel free to modify the script to suit your specific use case, and explore other features of DynamoDB like global secondary indexes or auto-scaling for even more optimized performance.

Top comments (0)