DEV Community

Cover image for Checking object existence in large AWS S3 buckets using Python and PySpark (plus some grep comparison)
Bartosz Górski
Bartosz Górski

Posted on

Checking object existence in large AWS S3 buckets using Python and PySpark (plus some grep comparison)

Introduction

In my recent project, I encountered a need to check if data from 3rd party database corresponds with the documents in a S3 bucket. While this might seem like a straightforward task, the approach, the dataset was massive - up to 10 million objects in a single bucket. Traditional iteration over objects list or requesting head for every searched file will take forever. I took some interesting steps, using Python and PySpark to search through potentially large datasets efficiently.

Here's a detailed breakdown of my process.

Listing S3 Bucket Contents and Saving Directory Names

The first step was to list the contents of the S3 bucket and save the names of the subdirectiories to a text file. For this, I utilized the Boto3 library in Python, which is a powerful interface to interact with Amazon Web Services (AWS).

Here's a snippet of the code used to accomplish this task:

import os
import sys
import boto3

__doc__ = """
Usage: python get_objects.py <bucket_name> <output_file> [prefix]
Example: python get_objects.py my_bucket objects.txt prefix
"""

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(__doc__)
        sys.exit(1)

    bucket_name = sys.argv[1]
    output_file = sys.argv[2]
    prefix = sys.argv[3] if len(sys.argv) > 3 else ""
    s3 = boto3.client("s3")
    try:
        os.remove(output_file)
    except OSError:
        pass
    continuation_token = None
    while True:
        if continuation_token:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, ContinuationToken=continuation_token, Delimiter='/')
        else:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
        objects = response.get("CommonPrefixes", [])
        continuation_token = response.get("NextContinuationToken")
        with open(output_file, "a") as f:
            for obj in objects:
                f.write(obj["Prefix"] + "\n")
        print(f"{len(objects)} Objects in {bucket_name} are listed in {output_file}")
        if not continuation_token:
            break
Enter fullscreen mode Exit fullscreen mode

Let's break it down.

  1. os.remove(output_file) Every run should start with the empty file, so we are removing the output file if exists.
  2. s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, ContinuationToken=continuation_token, Delimiter='/') gets up too 1000 objects from the bucket bucket_name from "sub-directory" prefix using delimiter / to retrieve only directories. As we are not interested in checking the number of files in directories, we don't need to check non-directory entries. ContinuationToken is used for retrieving all the elements in the loop, as the maximal number of retrieved objects in the single call is 1000
  3. As the listed "directory" objects have no content, we need to save the value of the Prefix property of that object.

Using PySpark to Search for the Selected Directory

With the directory names saved in a text file, the next step was to leverage PySpark for efficient searching. PySpark's DataFrame API provides a powerful way to handle large datasets.

Here's an example of how I used PySpark to search for a selected directory:

import os
import sys
import time
from pyspark.sql import SparkSession


def load_input_data(spark):
    if os.path.exists("out.parquet"):
        df = spark.read.load("out.parquet")
        print(f"Loaded {df.count()} records from file out.parquet")
    else:
        df = spark.read.text("out.txt")
        print(f"Loaded {df.count()} records from file out.txt")
        df.write.save("out.parquet", format="parquet")
        print(f"Saved dataframe to out.parquet")
    return df


def find_entry(id, df):
    return df.filter(df.value == id).count() > 0


def find_in_s3(id_to_find):
    spark = SparkSession.builder.appName("S3Find").getOrCreate()
    df = load_input_data(spark)
    print("Loading input data finished after %s seconds ---" % (time.time() - start_time))
    found = find_entry(id_to_find, df)
    print(f"Found entry {id_to_find}" if found else f"Entry {id_to_find} not found")
    spark.stop()


if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.exit(1)
    id_to_find = sys.argv[1]
    start_time = time.time()
    find_in_s3(id_to_find)
    print("--- %s seconds ---" % (time.time() - start_time))
Enter fullscreen mode Exit fullscreen mode

load_input_data seeks for the saved parquet data, or, if not available loads the selected txt file, stores it in a data frame, and saves parquet data.
find_entry uses a filter on a data frame to check if selected element exists
find_in_s3 creates a new spark session, loads data frame and performs finding
At main method I added simple execution time counting.

Benchmarks

Using a simple test generator I created 3 collections (100k, 1M and 10M elements) of random uuids and test cases with 10, 100 and 1000 items with some randomly added suffixes (for non-existence search).
The results shocked me a bit. It was blazing fast (time in seconds):

Spark computing time

Spark computing time plot

As we can see, search time was nearly exactly correlated with the number of searched elements, not collection size.

Grep

Internet said

Grep is always faster for searching in files.

Ok, let's try, here is a test code that check computing time for the grep and spark on the same datasets and test uuids:

import os
import subprocess
import sys
import time
from pyspark.sql import SparkSession


def find_entry(id, df):
    return df.filter(df.value == id).count() > 0


def spark_find(test_file):
    start_time = time.time()
    spark = SparkSession.builder.appName("S3Find").getOrCreate()
    df = spark.read.text("test_output")
    with open(test_file, "r") as f:
        for line in f:
            id_to_find = line.rstrip()
            find_entry(id_to_find, df)
    spark.stop()
    print("SPARK: --- %s seconds ---" % (time.time() - start_time))

def grep_find(test_file):
    start_time = time.time()
    with open(test_file, "r") as f:
        for line in f:
            id_to_find = line.rstrip()
            subprocess.call(['/usr/bin/grep', '-q', id_to_find, 'test_output'])
    print("GREP: --- %s seconds ---" % (time.time() - start_time))

if __name__ == "__main__":

    numOfRecords = str(sys.argv[1])
    spark_find("test_outputtest"+numOfRecords)
    grep_find("test_outputtest"+numOfRecords)


Enter fullscreen mode Exit fullscreen mode

And the results for the grep:

Grep computing time
Grep computing time plot

For relatively small collections grep was faster, but when it comes to millions spark solution overcomes good 'ol grep.

A direct comparison of Spark vs Grep:

Spark vs grep efficiency comparison

Summary

To understand why PySpark outperformed grep, let's delve into the differences:

Grep:

  • Traditional command-line utility for searching plain-text data.
  • Efficient for small to medium-sized text files.
  • Performance drops significantly with larger datasets due to linear search.

PySpark:

  • Distributed computing framework, ideal for large-scale data processing.
  • It uses in-memory computations, which speed up the search process.
  • Capable of handling much larger datasets efficiently.

In summary, while grep is a fantastic tool for quick searches on smaller datasets, PySpark shines when dealing with larger datasets, offering significant performance improvements due to its distributed nature.

By leveraging Python and PySpark, I was able to efficiently determine the existence of directories in an S3 bucket, saving time and computational resources. This method showcases the power of modern data processing tools and their application in real-world scenarios.

All the mentioned code can be found in my Repo

Feel free to share your thoughts or ask questions in the comments below!

Note: The performance results may vary based on the specific configuration and resources of your environment.

Top comments (0)