DEV Community

loading...

Beyond CSV files: using Apache Parquet columnar files with Dask to reduce storage and increase performance. Try it now!

Jorge PM
Experienced Solution Architect and Software Developer currently working as a hands-on Chief Technical Officer.
Updated on ・5 min read

Index

  1. Introduction
  2. Environment preparation and data download
  3. Comparing CSV to Apache Parquet
  4. Conclusion and final thoughts

Before we start just a word on why I wrote this...

This first part doesn't contain any technical content, you can go directly to introduction if you want.

This post builds on one of my previous posts: https://dev.to/zompro/quick-pandas-and-dask-comparison-processing-large-csv-files-real-world-example-that-you-can-do-now-1n15

That posts introduces Dask as an alternative to Pandas. This post presents Apache Parquet as an alternative to CSV files. Columnar files can perform very well and, in particular Parquet, can save a lot of space because of its compression capabilities.

This posts is an independent stand alone post (so there's some repetition from the previous post).

1. Introduction

Before we start you must:

  • Have python 3.5 or older installed with venv installed
  • At the moment a Linux-based system or a little bit of knowledge to translate the commands (I don't have a Windows machine close but give me a shout if you want me to translate the non-compatible commands)
  • That's it!

Apache Parquet is a file format (a data storage format more specifically) that focus on handling data through columns (as opposite to rows like in a CSV file). It isn't the only columnar format but it is a well known and popular one. In a nutshell, this format stores information about the data allowing to perform certain operations very quickly. It also relies heavily on compression allowing for smaller file sizes.

If you come from a "normal" database or are used to work with Data Frames, it is normal to think in rows. You have a header row naming each column and then pick up the rows one by one. This structure represents very well the relationship that exists between the columns or fields. However, sometimes you want to run a query that affects heavily a particular column rather than each row.

Think for example trying to find a particular date or a range of dates in a timestamp column. In a bad scenario, you need to scan every record in the column to find what you need. Formats such as CSV don't know anything about the information they contain. Parquet will know more about your column and the data will be stored in a way that will make these type of queries perform much better.

Enough writing! let's get our hands into it! By the way, the Wikipedia page on Apache Parquet is amazing in case you want to go deeper. It's a very interesting but massive subject: https://en.wikipedia.org/wiki/Apache_Parquet

2. Environment preparation and data download

This section is very similar to my previous post (https://dev.to/zompro/quick-pandas-and-dask-comparison-processing-large-csv-files-real-world-example-that-you-can-do-now-1n15). If you already followed that post, you just need to install pyarrow by activating your virtual environment and running pip install pyarrow

If you are coming to this post first, carry on reading so you can get your environment and data ready.

We are going to create a virtual environment, install Dask, pyarrow and Jupyter Notebooks (this last one just to run our code).

We will now create the main folder called parquetdask and a virtual environment called venv inside:

mkdir parquetdask
cd parquetdask
python3 -m venv venv
Enter fullscreen mode Exit fullscreen mode

Now we will activate the virtual environment and install the packages we are going to need

source venv/bin/activate
pip install "dask[complete]==2.27.0" pyarrow==1.0.1 jupyter==1.0.0
Enter fullscreen mode Exit fullscreen mode

Before we move on to our notebook, let's download the data we are going to use (this is the same data we used in the previous post so you don't have to download it again if you already have it). We will use the uk gov housing price paid data. Make sure you read the usage guidelines here https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads#using-or-publishing-our-price-paid-data

The copyright disclaimer:

Contains HM Land Registry data © Crown copyright and database right 2020. This data is licensed under the Open Government Licence v3.0.

We will download all the data ever recorded.

mkdir data
cd data
wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv
cd ..
Enter fullscreen mode Exit fullscreen mode

We created a data folder, went into it, downloaded the data and now we are back at the root of our directory.

3. Comparing CSV to Apache Parquet

Start your notebook

jupyter notebook
Enter fullscreen mode Exit fullscreen mode

Then create a new notebook and copy these sections into separated sections.

First we import the libraries we are going to need and start Dask (you can read more about this in my previous post)

import time
import os
import subprocess
import dask
import dask.dataframe as dd
from dask.delayed import delayed
import time
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=4)
Enter fullscreen mode Exit fullscreen mode

Let's define some variables we will need

all_data = "data/pp-complete.csv"
columns = ["transaction", "price", "date", "postcode", "prop_type", "old_new", "duration", "paon", "saon", "street", "locality", "city", "district", "county", "ppd_cat", "rec_status"]
Enter fullscreen mode Exit fullscreen mode

Now, we will run some analysis using Dask directly from our csv file.

start = time.time()
df = dd.read_csv(all_data,  blocksize=32 * 1024 * 1024, header=0, names=columns)
df_by_county = df.groupby("county")["price"].sum().compute()
print(df_by_county)
end = time.time()
print("time elapsed {}".format(end-start))
Enter fullscreen mode Exit fullscreen mode

The result I got was:

dask using a csv file - time elapsed 19.78 seconds
(remember from the previous post, pandas took over 50 secs)
Enter fullscreen mode Exit fullscreen mode

Next, we will transform our CSV file to Parquet (it will take some time)

 df.to_parquet(all_data + ".parquet", engine='pyarrow')
Enter fullscreen mode Exit fullscreen mode

Before we move on, let's have a look at the size of the two files (the parquet and the csv one in our data folder). You can use the follow command in a unix-like system or just look at the size in your file browser (note that parquet is a folder, not a file)

print("{}G".format(round(os.path.getsize(all_data)/1000000000, 1)))
print(subprocess.check_output(['du','-sh', all_data + ".parquet"]).split()[0].decode('utf-8'))
Enter fullscreen mode Exit fullscreen mode

The results I got was:

4.4G
2.2G
Enter fullscreen mode Exit fullscreen mode

This is already showing the power of compression implemented in Parquet.

But what about performance? let's re-run the same process again, this time reading from the Parquet version:

start = time.time()
df = dd.read_parquet(all_data + ".parquet",  blocksize=32 * 1024 * 1024, header=0, names=columns, engine='pyarrow')
df_by_county = df.groupby("county")["price"].sum().compute()
print(df_by_county)
end = time.time()
print("time elapsed {}".format(end-start))
Enter fullscreen mode Exit fullscreen mode

My result I got was:

dask using a parquet file - time elapsed 13.65 seconds
Enter fullscreen mode Exit fullscreen mode

That's approximately 30% faster than the csv. Not bad at all considering you also get the storage space improvement.

4. Conclusion and final thoughts

This article is just a very brief introduction to Apache Parquet. There's a lot more to discover and analyse.

For example, you can explore the folder created as the Parquet "file". In there you can see that the original csv file is split into lots of smaller files allowing for better parallel execution.

The file size (and number of) split is itself something you can modify to test for performance improvement. There will be a relation between the number of files and the number of Dask workers/threads running that will define the optimum performance.

You should notice also that there are metadata files. While these are not really fully human readable, they do show why this format performs so well in what we did. The file "knows" more about the data.

Finally, we are using PyArrow to handle the file but you could use Fastparquet as an alternative. Again, this is something else you can explore.

I hope you found this post interesting. As I said, this is just an introduction and there are lots more to explore!

Discussion (4)

Collapse
paddy3118 profile image
Paddy3118

If you are worried about space wouldn't you work from a gzipped CSV file? I wonder what the size of the CSV file is when gzipped - 9?

Collapse
zompro profile image
Jorge PM Author • Edited

I believe Dask doesn't support reading from zip. See here: github.com/dask/dask/issues/2554#i....

Looking around it does seem to be able to read from a single gzip but it doesn't seem to be straightforward. If you need to unzip the file then any gains on storage would be nullified.

I would be very interested to try it if you know a way and compare size and performance. Normally storage is cheap, at least a lot cheaper than other resources so performance is in most cases the priority and the compression is a nice to have (depends on the use case of course)

Collapse
paddy3118 profile image
Paddy3118 • Edited

Sadly I don't use Dask, but in the past I have used zcat on a Linux command line to stream a csv to stdin for a script to then process without needing the whole of the data uncompressed in memory/on disk.

Thread Thread
zompro profile image
Jorge PM Author

Cool I can totally see a use case for that streaming into something like Apache Kafka. I will prototype a couple of things and see if it can become another little article. Thanks!