DEV Community

Abhishek Gupta for AWS

Posted on • Originally published at Medium

DynamoDB Go SDK: How to use the Scan and Batch operations efficiently

The DynamoDB Scan API accesses every items in a table (or secondary index). It is the equivalent of a select * from query. One of the things I will cover in this blog is how to use Scan API with the DynamoDB Go SDK.

To scan a table, we need some data to begin with! So in the process, I will also go into how to use the Batch API to write bulk data in DynamoDB. You can use the BatchWriteItem API to create or delete items in batches (of twenty five) and it's possible to you can combine these operations across multiple tables.

We will start simple and gradually improve our approach to use the APIs efficiently. I will also go over some of the basic tests that I ran to demonstrate incremental improvements. Finally I will wrap up by highlighting some of the considerations while using these operations.

You can refer to the code on GitHub

Before you proceed...

... make sure to create a DynamoDB table called users with:

  • partition key email (data type String) and
  • On-Demand capacity mode.

Image description

Also, there are a few things I want to call a few things to set the context:

  • The table was created in us-east-1 and tests were executed from an EC2 instance in us-east-1 as well
  • Since these are general tests instead of specialised benchmarks, I did not do any special tuning (at any level). These are just Go functions that were executed with different inputs, keeping things as simple as possible.
  • The tests include marshalling (converting Go struct to DynamoDB data types) for BatchWriteItem operations and un-marshalling (converting from DynamoDB data types back to Go struct) for Scan operation.

Lets start off by exploring the BatchWriteItem API. This way we will have data to work with the Scan operations as well.

Win-win!

Importing data in batches

Since you can combine 25 items in a single invocation, using a batch approach for bulk data imports is much better compared to invoking the PutItem in a loop (or even in parallel).

Here is a basic example of how you would use BatchWriteItem:

func basicBatchImport() {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batch := make(map[string][]types.WriteRequest)
    var requests []types.WriteRequest

    for i := 1; i <= 25; i++ {
        user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
        item, _ := attributevalue.MarshalMap(user)
        requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
    }

    batch[table] = requests

    op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
        RequestItems: batch,
    })
    if err != nil {
        log.Fatal("batch write error", err)
    } else {
        log.Println("batch insert done")
    }

    if len(op.UnprocessedItems) != 0 {
        log.Println("there were", len(op.UnprocessedItems), "unprocessed records")
    }

    log.Println("inserted", (25 - len(op.UnprocessedItems)), "records in", time.Since(startTime).Seconds(), "seconds")
}
Enter fullscreen mode Exit fullscreen mode

With BatchWriteItemInput, we can define the operations we want to perform in the batch - here we are just going to perform PutRequests (which is encapsulated within another type called WriteRequest).

We assemble the WriteRequests in a slice and finally put them in a map with key being the table name - this is exactly what the RequestItems attribute in BatchWriteItemInput needs.

In this case we are dealing with a single table but you could execute operations on multiple tables.

In this example we just dealt with one batch of 25 records (maximum permitted batch size). If we want to import more records, all we need to do is split them into batches of 25 and execute them one (sub)batch at a time. Simple enough - here is an example:

func basicBatchImport2(total int) {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batchSize := 25
    processed := total

    for num := 1; num <= total; num = num + batchSize {

        batch := make(map[string][]types.WriteRequest)
        var requests []types.WriteRequest

        start := num
        end := num + 24

        for i := start; i <= end; i++ {
            user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
            item, _ := attributevalue.MarshalMap(user)
            requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
        }

        batch[table] = requests

        op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
            RequestItems: batch,
        })

        if err != nil {
            log.Fatal("batch write error", err)
        }

        if len(op.UnprocessedItems) != 0 {
            processed = processed - len(op.UnprocessedItems)
        }
    }

    log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

    if processed != total {
        log.Println("there were", (total - processed), "unprocessed records")
    }
}
Enter fullscreen mode Exit fullscreen mode

I tried this with 50000 records (which means 2000 batches) and it took approximately 15 seconds. But we can do much better!

Parallel batch import

Instead of processing each batch sequentially, we can spin up a goroutine for each batch:

func parallelBatchImport(numRecords int) {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batchSize := 25

    var wg sync.WaitGroup

    processed := numRecords

    for num := 1; num <= numRecords; num = num + batchSize {
        start := num
        end := num + 24

        wg.Add(1)

        go func(s, e int) {
            defer wg.Done()

            batch := make(map[string][]types.WriteRequest)
            var requests []types.WriteRequest

            for i := s; i <= e; i++ {
                user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}

                item, err := attributevalue.MarshalMap(user)
                if err != nil {
                    log.Fatal("marshal map failed", err)
                }
                requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
            }

            batch[table] = requests

            op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
                RequestItems: batch,
            })

            if err != nil {
                log.Fatal("batch write error", err)
            }

            if len(op.UnprocessedItems) != 0 {
                processed = processed - len(op.UnprocessedItems)
            }

        }(start, end)
    }

    log.Println("waiting for all batches to finish....")
    wg.Wait()

    log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

    if processed != numRecords {
        log.Println("there were", (numRecords - processed), "unprocessed records")
    }
}
Enter fullscreen mode Exit fullscreen mode

The results improved by a good margin. Here is what I got. On an average:

  • Inserting 50000 records took ~ 2.5 seconds
  • inserted 100000 records in ~ 4.5 to 5 seconds
  • inserted 150000 records in less than 9.5 seconds
  • inserted 200000 records in less than 11.5 seconds

There maybe unprocessed records in a batch. This example detects these records, but the retry logic has been skipped to keep things simple. Ideally you should have a (exponential back-off based) retry mechanism for handling unprocessed records as well.

To insert more data, I ran the parallelBatchImport function (above) in loops. For example:

for i := 1; i <= 100; i++ {
    parallelBatchImport(50000)
}
Enter fullscreen mode Exit fullscreen mode

Alright, let's move ahead. Now that we have some data, let's try ...

... the Scan API

This is what basic usage looks like:

func scan() {
    startTime := time.Now()

    op, err := client.Scan(context.Background(), &dynamodb.ScanInput{
        TableName:              aws.String(table),
        ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
    })

    if err != nil {
        log.Fatal("scan failed", err)
    }

    for _, i := range op.Items {
        var u User
        err := attributevalue.UnmarshalMap(i, &u)
        if err != nil {
            log.Fatal("unmarshal failed", err)
        }
    }

    if op.LastEvaluatedKey != nil {
        log.Println("all items have not been scanned")
    }
    log.Println("scanned", op.ScannedCount, "items in", time.Since(startTime).Seconds(), "seconds")
    log.Println("consumed capacity", *op.ConsumedCapacity.CapacityUnits)
}
Enter fullscreen mode Exit fullscreen mode

Just provide the table (or secondary index) name and you are good to go! But, there are chances that you might not be able to get all items because of API limits (1 MB worth of data per invocation). In my case took about 0.5 secs for approximately 15000 records - rest of the items were skipped because the 1 MB limit was breached.

Using Pagination

To handle the limitation around data, the Scan API returns LastEvaluatedKey in its output to point to the last processed record. All you need to do is invoke Scan again, with the value for ExclusiveStartKey attribute set to the one for LastEvaluatedKey.

Using paginated scan approach took me approximately 100 secs to scan ~ 7.5 million records.

Parallel Scan

Pagination helps, but it's still a sequential process. There is lot of scope for improvement. Thankfully, Scan allows you to adopt a parallelized approach i.e. you can use multiple workers (goroutines in this case) to process data in parallel!

func parallelScan(pageSize, totalWorkers int) {
    log.Println("parallel scan with page size", pageSize, "and", totalWorkers, "goroutines")
    startTime := time.Now()

    var total int

    var wg sync.WaitGroup
    wg.Add(totalWorkers)

    for i := 0; i < totalWorkers; i++ {
        // start a goroutine for each segment

        go func(segId int) {
            var segTotal int

            defer wg.Done()

            lastEvaluatedKey := make(map[string]types.AttributeValue)

            scip := &dynamodb.ScanInput{
                TableName:     aws.String(table),
                Limit:         aws.Int32(int32(pageSize)),
                Segment:       aws.Int32(int32(segId)),
                TotalSegments: aws.Int32(int32(totalWorkers)),
            }

            for {
                if len(lastEvaluatedKey) != 0 {
                    scip.ExclusiveStartKey = lastEvaluatedKey
                }
                op, err := client.Scan(context.Background(), scip)

                if err != nil {
                    log.Fatal("scan failed", err)
                }

                segTotal = segTotal + int(op.Count)

                for _, i := range op.Items {

                    var u User
                    err := attributevalue.UnmarshalMap(i, &u)
                    if err != nil {
                        log.Fatal("unmarshal failed", err)
                    }
                }

                if len(op.LastEvaluatedKey) == 0 {
                    log.Println("[ segment", segId, "] finished")
                    total = total + segTotal
                    log.Println("total records processsed by segment", segId, "=", segTotal)
                    return
                }

                lastEvaluatedKey = op.LastEvaluatedKey
            }
        }(i)
    }

    log.Println("waiting...")
    wg.Wait()

    log.Println("done...")
    log.Println("scanned", total, "items in", time.Since(startTime).Seconds(), "seconds")
}
Enter fullscreen mode Exit fullscreen mode

Segment and TotalSegments attributes are the key to how Scan API enables parallelism. TotalSegments is nothing but the number of threads/goroutines/worker-processes that need to be spawned and Segment is a unique identifier for each of them.

In my tests, the Scan performance remained (almost) constant at 37-40 seconds (average) for about ~ 7.5 million records (I tried a variety of page size and goroutine combinations).

How many TotalSegments do I need to configure???

To tune appropriate number of parallel threads/workers, you might need to experiment a bit. A lot might depend on your client environment.

  • Do you have enough compute resources?
  • Some environments/runtimes might have managed thread-pools, so you will have to comply with those

So, you will need to try things out to find the optimum parallelism for your. one way to think about it could be to choose one segment (single worker/thread/goroutine) per unit of data (say a segment for every GB of data you want to scan).

Wrap up - API considerations

Both Batch and Scan APIs are quite powerful, but there are nuances you should be aware of. My advise is to read up the API documentation thoroughly.

With Batch APIs:

  • There are certain limits:
    • No more than 25 requests in a batch
    • Individual item in a batch should not exceeds 400KB
    • Total size of items in a single BatchWriteItem cannot be more than 16MB
  • BatchWriteItem cannot update items
  • You cannot specify conditions on individual put and delete requests
  • It does not return deleted items in the response
  • If there are failed operations, you can access them via the UnprocessedItems response parameter

Use Scan wisely

Since a Scan operation goes over the entire table (or secondary index), it's highly likely that it consumes a large chunk of the provisioned throughput, especially if it's a large table. That being said, Scan should be your last resort. Check whether Query API (or BatchGetItem) works for your use-case.

The same applies to parallel Scan.

There are a few ways in which you can further narrow down the results by using a Filter Expression, a Limit parameter (as demonstrated earlier) or a ProjectionExpression to return only a subset of attributes.

That's all for this blog. I hope you found it useful.

Until next time, Happy coding!

Top comments (0)