Introduction
In my recent project, I encountered a need to check if data from 3rd party database corresponds with the documents in a S3 bucket. While this might seem like a straightforward task, the approach, the dataset was massive - up to 10 million objects in a single bucket. Traditional iteration over objects list or requesting head for every searched file will take forever. I took some interesting steps, using Python and PySpark to search through potentially large datasets efficiently.
Here's a detailed breakdown of my process.
Listing S3 Bucket Contents and Saving Directory Names
The first step was to list the contents of the S3 bucket and save the names of the subdirectiories to a text file. For this, I utilized the Boto3 library in Python, which is a powerful interface to interact with Amazon Web Services (AWS).
Here's a snippet of the code used to accomplish this task:
import os
import sys
import boto3
__doc__ = """
Usage: python get_objects.py <bucket_name> <output_file> [prefix]
Example: python get_objects.py my_bucket objects.txt prefix
"""
if __name__ == "__main__":
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
bucket_name = sys.argv[1]
output_file = sys.argv[2]
prefix = sys.argv[3] if len(sys.argv) > 3 else ""
s3 = boto3.client("s3")
try:
os.remove(output_file)
except OSError:
pass
continuation_token = None
while True:
if continuation_token:
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, ContinuationToken=continuation_token, Delimiter='/')
else:
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
objects = response.get("CommonPrefixes", [])
continuation_token = response.get("NextContinuationToken")
with open(output_file, "a") as f:
for obj in objects:
f.write(obj["Prefix"] + "\n")
print(f"{len(objects)} Objects in {bucket_name} are listed in {output_file}")
if not continuation_token:
break
Let's break it down.
-
os.remove(output_file)
Every run should start with the empty file, so we are removing the output file if exists. -
s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, ContinuationToken=continuation_token, Delimiter='/')
gets up too 1000 objects from the bucketbucket_name
from "sub-directory"prefix
using delimiter/
to retrieve only directories. As we are not interested in checking the number of files in directories, we don't need to check non-directory entries.ContinuationToken
is used for retrieving all the elements in the loop, as the maximal number of retrieved objects in the single call is 1000 - As the listed "directory" objects have no content, we need to save the value of the
Prefix
property of that object.
Using PySpark to Search for the Selected Directory
With the directory names saved in a text file, the next step was to leverage PySpark for efficient searching. PySpark's DataFrame API provides a powerful way to handle large datasets.
Here's an example of how I used PySpark to search for a selected directory:
import os
import sys
import time
from pyspark.sql import SparkSession
def load_input_data(spark):
if os.path.exists("out.parquet"):
df = spark.read.load("out.parquet")
print(f"Loaded {df.count()} records from file out.parquet")
else:
df = spark.read.text("out.txt")
print(f"Loaded {df.count()} records from file out.txt")
df.write.save("out.parquet", format="parquet")
print(f"Saved dataframe to out.parquet")
return df
def find_entry(id, df):
return df.filter(df.value == id).count() > 0
def find_in_s3(id_to_find):
spark = SparkSession.builder.appName("S3Find").getOrCreate()
df = load_input_data(spark)
print("Loading input data finished after %s seconds ---" % (time.time() - start_time))
found = find_entry(id_to_find, df)
print(f"Found entry {id_to_find}" if found else f"Entry {id_to_find} not found")
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit(1)
id_to_find = sys.argv[1]
start_time = time.time()
find_in_s3(id_to_find)
print("--- %s seconds ---" % (time.time() - start_time))
load_input_data
seeks for the saved parquet data, or, if not available loads the selected txt file, stores it in a data frame, and saves parquet data.
find_entry
uses a filter on a data frame to check if selected element exists
find_in_s3
creates a new spark session, loads data frame and performs finding
At main method I added simple execution time counting.
Benchmarks
Using a simple test generator I created 3 collections (100k, 1M and 10M elements) of random uuids and test cases with 10, 100 and 1000 items with some randomly added suffixes (for non-existence search).
The results shocked me a bit. It was blazing fast (time in seconds):
As we can see, search time was nearly exactly correlated with the number of searched elements, not collection size.
Grep
Internet said
Grep is always faster for searching in files.
Ok, let's try, here is a test code that check computing time for the grep and spark on the same datasets and test uuids:
import os
import subprocess
import sys
import time
from pyspark.sql import SparkSession
def find_entry(id, df):
return df.filter(df.value == id).count() > 0
def spark_find(test_file):
start_time = time.time()
spark = SparkSession.builder.appName("S3Find").getOrCreate()
df = spark.read.text("test_output")
with open(test_file, "r") as f:
for line in f:
id_to_find = line.rstrip()
find_entry(id_to_find, df)
spark.stop()
print("SPARK: --- %s seconds ---" % (time.time() - start_time))
def grep_find(test_file):
start_time = time.time()
with open(test_file, "r") as f:
for line in f:
id_to_find = line.rstrip()
subprocess.call(['/usr/bin/grep', '-q', id_to_find, 'test_output'])
print("GREP: --- %s seconds ---" % (time.time() - start_time))
if __name__ == "__main__":
numOfRecords = str(sys.argv[1])
spark_find("test_outputtest"+numOfRecords)
grep_find("test_outputtest"+numOfRecords)
And the results for the grep:
For relatively small collections grep was faster, but when it comes to millions spark solution overcomes good 'ol grep.
A direct comparison of Spark vs Grep:
Summary
To understand why PySpark outperformed grep, let's delve into the differences:
Grep:
- Traditional command-line utility for searching plain-text data.
- Efficient for small to medium-sized text files.
- Performance drops significantly with larger datasets due to linear search.
PySpark:
- Distributed computing framework, ideal for large-scale data processing.
- It uses in-memory computations, which speed up the search process.
- Capable of handling much larger datasets efficiently.
In summary, while grep is a fantastic tool for quick searches on smaller datasets, PySpark shines when dealing with larger datasets, offering significant performance improvements due to its distributed nature.
By leveraging Python and PySpark, I was able to efficiently determine the existence of directories in an S3 bucket, saving time and computational resources. This method showcases the power of modern data processing tools and their application in real-world scenarios.
All the mentioned code can be found in my Repo
Feel free to share your thoughts or ask questions in the comments below!
Note: The performance results may vary based on the specific configuration and resources of your environment.
Top comments (0)