DEV Community

Cover image for (Re)Processing Large S3 Files using Streams
drmikecrowe
drmikecrowe

Posted on

(Re)Processing Large S3 Files using Streams

TL;DR

Need to parse a large file using AWS Lambda in Node and split into individual files for later processing? Sample repo here:

GitHub logo drmikecrowe / serverless-s3-streaming-example

Serverless Project Streaming and Parsing S3 files

Serverless Project Streaming and Parsing S3 files

This repo illustrates how to stream a large file from S3 and split it into separate S3 files after removing prior files

Goals

  1. Parse a large file without loading the whole file into memory
  2. Remove old data when new data arrives
  3. Wait for all these secondary streams to finish uploading to s3

Managing Complex Timing

  • Writing to S3 is slow. You must ensure you wait until the S3 upload is complete
  • We can't start writing to S3 until all the old files are deleted.
  • We don't know how many output files will be created, so we must wait until the input file has finished processing before starting to waiting for the outputs to finish

Demonstration Problem Statement

  • A school district central computer uploads all the grades for the district for a semester
  • The data file is has the following headers
    • School,Semester,Grade,Subject,Class,Student Name,Score
  • Process…

Background

Let's face it, data is sometimes ugly. Sure, it's easy to get data from external systems. But how often is that external system giving the data to you in the right format?

Recently, I had to parse a large CSV file that had been uploaded to S3. This is an ideal fit for using AWS Lambda, and using serverless.com makes that process very smooth.

However, Lambda imposes memory limitations on processing. Fortunately, AWS supports the Node Streaming interface. No need to read the whole file into memory, simply stream it and process it with the excellent Node CSV package.

Here's where this story gets interested. What if the data is updated? To make matters worse, what if you have to replace the processed files with new files when an update comes in?

So, here are the challenges:

  1. Parse a large file without loading the whole file into memory
  2. Remove old data when new data arrives
  3. Wait for all these secondary streams to finish uploading to s3

So what is the difficulty here?

  • Writing to S3 is slow. You must ensure you wait until the S3 upload is complete
  • We can't start writing to S3 until all the old files are deleted.
  • We don't know how many output files will be created, so we must wait until the input file has finished processing before starting to waiting for the outputs to finish

Demo Repository

To simulate this scenario, I contrived the following:

  • A school district central computer uploads all the grades for the district for a semester
  • The data file is has the following headers:
    • School,Semester,Grade,Subject,Class,Student Name,Score
  • Process the uploaded file, splitting it into the following structure:
    • Semester/School/Grade
    • Create a file called Subject-Class.csv with all the grades for that class
  • For this simulation, the central computer can update an entire Semester by uploading a new file. This could be set differently based on the application: For instance, if the central computer could upload the grades for a specific Semester + School, then we could update this line with the revised criteria to only clear that block of data

Here's the general outline of the demo program flow:

  • Open the S3 file as a Stream (readStream)
  • Create a csvStream from the input readStream
  • Pipe readStream to csvStream
  • While we have New Lines
    • Is this line for a new school (i.e. new CSV file)?
      • Start a PassThru stream (passThruStream)
      • Does this line start a new Semester (top-level folder we're replacing) in S3?
        • Start deleting S3 folder
      • Are all files deleted?
        • Use s3.upload with Body=passThruStream to upload the file
    • Write New Line to the passThruStream
  • Loop thru all passThruStream streams and close/end
  • Wait for all passThruStream streams to finish writing to S3

Key Concepts

Don't Call Promise.all() Too Early

First, the main processing loop must wait for all lines to be processed before starting the Promise.all() to wait for the writes to finish. In the above repo, see these lines:

    this.pAllRecordsRead = this.openReadStream();
    await this.pAllRecordsRead;
    const promises: Promise<any>[] = [];
    for (let group of Object.keys(this.outputStreams)) {
        promises.push(this.outputStreams[group].pFinished);
    }
    await Promise.all(promises);
Enter fullscreen mode Exit fullscreen mode

Use s3.upload instead of s3.PutObject

s3.PutObject requires knowing the length of the output. Use s3.upload instead to stream an unknown size to your new file.

Wait for the S3.DeleteObjects to complete

Timing is critical:

  1. Start the file/folder deletion promise
  2. Wait until that completes
  3. Open the output stream

You can see the specific timing here in the demo code.

Boiled down, it looks like the code below. In short:

  • Every line is written to the passThruStream
  • When a new file must be created:
    • If the old contents must be deleted
      • Start the delete promise
    • Otherwise
      • Wait for the ongoing delete promise
    • Open the outputStream
    • Pipe the passThruStream to the outputStream
if (!outputStreams[outputFileName]) {
    const topLevelFolder = ...
    if (!deletePromises[topLevelFolder]) {
        deletePromises[topLevelFolder] = deleteOldFiles(topLevelFolder);
    }
    const passThruStream = ...
    inputStream.on("end", () => passThruStream.end());      // End passThruStream when the reader completes
    const pFinished = new Promise((resolve, reject) => {
        (async () => {
            await deletePromises[topLevelFolder];
            outputStream = ...
            passThruStream.pipe(outputStream);
            ...
        })().catch((err) => {
            reject(err);
        });
    });

    const outputFile: IOutputFile = {
        passThruStream,
        pFinished,
    };
    outputStreams[outputFileName] = outputFile;
}
outputStreams[outputFileName].passThruStream.write(record);


Enter fullscreen mode Exit fullscreen mode

Conclusion

Use Node Streams to buffer your S3 uploads. By using the PassThrough stream, you can perform operations on your S3 bucket/folder prior to actually starting the s3 upload process.

Top comments (0)