DEV Community

Cover image for Efficiently Streaming a Large AWS S3 File via S3 Select
Idris Rampurawala
Idris Rampurawala

Posted on • Updated on

Efficiently Streaming a Large AWS S3 File via S3 Select

AWS S3 is an industry-leading object storage service. We tend to store lots of data files on S3 and at times require processing these files. If the size of the file that we are processing is small, we can basically go with traditional file processing flow, wherein we fetch the file from S3 and then process it row by row level. But the question arises, what if the file is size is more viz. > 1GB? 😓

Importing (reading) a large file leads Out of Memory error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally? 🤔

📜 Let's consider some of the use-cases:

  • We want to process a large CSV S3 file (~2GB) every day. It must be processed within a certain time frame (e.g. in 4 hours)
  • We are required to process large S3 files regularly from the FTP server. New files come in certain time intervals and to be processed sequentially i.e. the old file has to be processed before starting to process the newer files.

These are some very good scenarios where local processing may impact the overall flow of the system. Also, if we are running these file processing units in containers, then we have got limited disk space to work with. Hence, a cloud streaming flow is needed (which can also parallelize the processing of multiple chunks of the same file by streaming different chunks of the same file in parallel threads/processes). This is where I came across the AWS S3 Select feature. 😎

📝 This post focuses on streaming a large file into smaller manageable chunks (sequentially). This approach can then be used to parallelize the processing by running in concurrent threads/processes. Check my next post on this.


S3 Select

With Amazon S3 Select, you can use simple structured query language (SQL) statements to filter the contents of Amazon S3 objects and retrieve just the subset of data that you need. Using Amazon S3 Select to filter this data, you can reduce the amount of data that Amazon S3 transfers, reducing the cost and latency to retrieve this data.

Amazon S3 Select works on objects stored in CSV, JSON, or Apache Parquet format. It also works with objects that are compressed with GZIP or BZIP2 (for CSV and JSON objects only) and server-side encrypted objects. You can specify the format of the results as either CSV or JSON, and you can determine how the records in the result are delimited.

📝 We will be using Python's boto3 to accomplish our end goal.

🧱 Constructing SQL expressions

In order to work with S3 Select, boto3 provides select_object_content() function to query S3. You pass SQL expressions to Amazon S3 in the request. Amazon S3 Select supports a subset of SQL. Check this link for more information on this.

response = s3_client.select_object_content(
    Bucket=bucket,
    Key=key,
    ExpressionType='SQL',
    Expression='SELECT * FROM S3Object',
    InputSerialization={
        'CSV': {
            'FileHeaderInfo': 'USE',
            'FieldDelimiter': ',',
            'RecordDelimiter': '\n'
        }
    },
    OutputSerialization={
        'JSON': {
            'RecordDelimiter': ','
        }
    }
)
Enter fullscreen mode Exit fullscreen mode

In above request, InputSerialization determines the S3 file type and related properties, while OutputSerialization determines the response that we get out of this select_object_content().

🌫️ Streaming Chunks

Now, as we have got some idea about how the S3 Select works, let's try to accomplish our use-case of streaming chunks (subset) of a large file like how a paginated API works. 😋

S3 Select supports ScanRange parameter which helps us to stream a subset of an object by specifying a range of bytes to query. S3 Select requests for a series of non-overlapping scan ranges. Scan ranges don't need to be aligned with record boundaries. A record that starts within the scan range specified but extends beyond the scan range will be processed by the query. It means that the row would be fetched within the scan range and it might extend to fetch the whole row. It doesn't fetch a subset of a row, either the whole row is fetched or it is skipped (to be fetched in another scan range).

Let's try to achieve this in 2 simple steps:

1. Find the total bytes of the S3 file

The following code snippet showcases the function that will perform a HEAD request on our S3 file and determines the file size in bytes.

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
Enter fullscreen mode Exit fullscreen mode

2. Create a generator to stream the chunks

Now, the logic is to yield the chunks of byte stream of the S3 file until we reach the file size. Rest assured, this continuous scan range won't result in over-lapping of rows in the response 😉 (check the output image / GitHub repo). Simple enough, eh? 😝

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = '<s3-bucket>'
    key = '<s3-key>'
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
Enter fullscreen mode Exit fullscreen mode

Streaming file output

Congratulations! 👏 We have successfully managed to solve one of the key challenges of processing a large S3 file without crashing our system. 🤘

📌 You can check out my GitHub repository for a complete working example of this approach 👇

GitHub logo idris-rampurawala / s3-select-demo

This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.

AWS S3 Select Demo

The MIT License

This project showcases the rich AWS S3 Select feature to stream a large data file in a paginated style.

Currently, S3 Select does not support OFFSET and hence we cannot paginate the results of the query. Hence, we use scanrange feature to stream the contents of the S3 file.

Background

Importing (reading) a large file leads Out of Memory error. It can also lead to a system crash event. There are libraries viz. Pandas, Dask, etc. which are very good at processing large files but again the file is to be present locally i.e. we will have to import it from S3 to our local machine. But what if we do not want to fetch and store the whole S3 file locally at once? 🤔

Well, we can make use of AWS S3 Select to stream a large file via it's ScanRange parameter. This approach…


✔️ Benefits of using S3-Select

  • Reduced IO — thus better performance
  • Reduced costs due to smaller data transfer fees
  • Multiple chunks can be run in parallel to expedite the file processing using ScanRange in multiple threads/processes

❗ Limitations of S3 Select

  • The maximum length of a record in the input or result is 1 MB
  • Amazon S3 Select can only emit nested data using the JSON output format
  • S3 select returns a stream of encoded bytes, so we have to loop over the returned stream and decode the output records['Payload'].decode('utf-8')
  • Only works on objects stored in CSV, JSON, or Apache Parquet format. For more flexibility/features, you can go for AWS Athena

📑 Resources


You might also wanna read a sequel of this post 👇


See ya! until my next post 😋

Top comments (4)

Collapse
 
visantanna profile image
Vinícius

Hi Idris, Great post! I tried to do a similar code where I select all data from a s3 file and recreated this same file locally with the same exact format. But after building the file I noticed that the local file had fewer records than the real one. As I increased the chunk size of the scan range the difference between the s3 file and the local file diminished. Do you have any idea why this might happen? Obs: file format: CSV , No compression, 5000 and 20000 bytes chunk range used for the tests. Once again, thank you for the post.

Collapse
 
idrisrampurawala profile image
Idris Rampurawala

Hi,
Glad that you liked the post and it helped you in your use-case.
With this process of streaming the data, you have to keep retrieving the file chunk from S3 until you reach the total file size. I would recommend to clone this repo and compare with your local code to identify if you missed something 😉

Optionally, I would recommend to also check out the sequel to this post for parallel processing 😁

Collapse
 
visantanna profile image
Vinícius

Thank you! I found out what I was missing, I made the start_byte = end_byte + 1. Losing one row per chunk. Your next article was exact what I was looking for for the next step of my program.

Collapse
 
collimarco profile image
Marco Colli

The maximum length of a record in the input or result is 1 MB

Is this the maximum file size of the file on S3?
Is there any size limit on the file that I want to "filter"?