DEV Community

Tomoya Oda
Tomoya Oda

Posted on • Updated on

Spark on AWS Glue: Performance Tuning 1 (CSV vs Parquet)

These are my posts for Spark on AWS Glue: Performance Tuning

  1. Spark on AWS Glue: Performance Tuning 1 (CSV vs Parquet)

  2. Spark on AWS Glue: Performance Tuning 2 (Glue DynamicFrame vs Spark DataFrame)

  3. Spark on AWS Glue: Performance Tuning 3 (Impact of Partition Quantity)

  4. Spark on AWS Glue: Performance Tuning 4 (Spark Join)

  5. Spark on AWS Glue: Performance Tuning 5 (Using Cache)

Introduction

I recently started reading Learning Spark 2nd Edition from O'Reilly.1

According to the book, the performance seems to vary depending on how you use it. So, let's take a look around.

I would casually record the results of comparing the execution speeds on my end.

Data Preparation

I am going to use some random ELB access logs that were available. The total size of the data is 94.5 GB.

Measurement Function

from contextlib import contextmanager
import time

@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print(f'[{name}] done in {time.time() - t0:.4f} s')
Enter fullscreen mode Exit fullscreen mode

CSV vs Parquet

Generally, columnar formats such as Parquet have the advantage of data compression rates and increased data read speeds by only retrieving the necessary columns.2

Parquet

when reading a Parquet file, Spark first references the metadata and obtain the position of the block to be read. The block to be read contains statistical information such as the min/max values of that block.

For example, if you wanted data with a condition of value > 5.0, you can speed up the process by using the block's statistical information to skip the reading target.
This is called Predicate Pushdown.

Reading Speed Comparison

Let's see if there is any difference in the reading speed in
CSV vs Parquet format.
Also, we will create partitioned data.

(I have added a hours column)

# add hour column
from pyspark.sql.functions import hour
df = df.withColumn("hours", hour("request_timestamp"))
Enter fullscreen mode Exit fullscreen mode
df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high/')
Enter fullscreen mode Exit fullscreen mode
df.write.mode('append')\
    .partitionBy('hours')\
    .csv('s3://.../csv-partition-high/')
Enter fullscreen mode Exit fullscreen mode
df.coalesce(1).write.mode('append').parquet('s3://..../parquet-chunk-high/')
Enter fullscreen mode Exit fullscreen mode
df.write.mode('append')\
    .partitionBy('hours')\
    .parquet('s3://.../parquet-partition-high/')
Enter fullscreen mode Exit fullscreen mode

Reading the Data

with timer('csv'):
    df = spark.read.format("csv").load("s3://.../csv-chunk-high/")
    print(df.count())

with timer('csv partition'): 
    df = spark.read.format("csv").load("s3://.../csv-partition-high/")
    print(df.count())

with timer('parquet'):
    df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/")
    print(df.count())

with timer('parquet partition'): 
    df = spark.read.format("parquet").load("s3://.../parquet-partition-high/")
    print(df.count())      
Enter fullscreen mode Exit fullscreen mode
324917265
[csv] done in 27.1925 s

324917265
[csv partition] done in 36.3690 s

324917265
[parquet] done in 31.8977 s

324917265
[parquet partition] done in 32.5805 s
Enter fullscreen mode Exit fullscreen mode

The result seems to be no big difference in read speed between Parquet and CSV.

Reading Part of the Data

CSV doesn't have column names.

with timer('csv'):
    df = spark.read.format("csv").load("s3://.../csv-chunk-high/")
    df = df.filter(df['_c6'] < 0.0008)
    print(df.count())

with timer('csv partition'): 
    df = spark.read.format("csv").load("s3://.../csv-partition-high/")
    df = df.filter(df['_c6'] < 0.0008)
    print(df.count())

with timer('parquet'):
    df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/")
    df = df.filter(df['request_processing_time'] < 0.0008)
    print(df.count())

with timer('parquet partition'): 
    df = spark.read.format("parquet").load("s3://.../parquet-partition-high/")
    df = df.filter(df['request_processing_time'] < 0.0008)
    print(df.count())
Enter fullscreen mode Exit fullscreen mode
119627151
[csv] done in 44.2805 s
119627151
[csv partition] done in 48.3934 s

119627151
[parquet] done in 32.7956 s
119627151
[parquet partition] done in 37.8519 s
Enter fullscreen mode Exit fullscreen mode

Parquet is faster!

Data Size Comparison

Snappy.parquet is smaller in data size compared to CSV

aws s3 ls s3://.../csv-chunk-high/  --recursive --human --sum
   Total Size:  94.5 GB

aws s3 ls s3://.../parquet-chunk-high/  --recursive --human --sum
   Total Size:  11.7 GB
Enter fullscreen mode Exit fullscreen mode

How much is CSV gzip?

As CSV is uncompressed, the above comparison would not be fair.
So let's compress CSV using gzip by adding the arguments as follows

df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high-compress/', compression="gzip")
Enter fullscreen mode Exit fullscreen mode
aws s3 ls s3://.../csv-chunk-high-compress/  --recursive --human --sum
   Total Size: 13.3 GiB
Enter fullscreen mode Exit fullscreen mode

11.7 GB vs 13.3 GiB, Snappy.parquet is compressed more than CSV gzip.

Summary

  • Reading speed for the entire data is no different between CSV and Parquet
  • Parquet reads faster when I used filters
  • Snappy.parquet has good compression efficiency

References


  1. https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/ 

  2. https://aws.amazon.com/jp/blogs/news/top-10-performance-tuning-tips-for-amazon-athena/ 

Top comments (0)