DEV Community

Cover image for Spark: Introduction
Cris Crawford
Cris Crawford

Posted on

Spark: Introduction

This week in Data Engineering Zoomcamp we're working on batch processing using Spark. Spark uses Python, Java or Scala. We used Python in a jupyter notebook. I already had a virtual machine set up in Google cloud. So it wasn't difficult to follow directions to set up the environment, install Spark, and run a jupyter notebook that imported pyspark and started a session. The only hitch was that somehow, when I forwarded the port to my local computer, jupyter notebook (localhost:8888) required a token, or a password. This confused me until I looked at the terminal window where I had started it up. There were the instructions about how to use the token. I wonder why I hadn't seen this before. It doesn't always ask me for a token.

So, having opened a jupyter notebook, the first thing I did was:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") ]
        .appName('test') \
        .getOrCreate()
Enter fullscreen mode Exit fullscreen mode

"local[*]" tells Spark to create a local cluster, and * means to use all CPUs.

I had to change the next line, because the NYC-TLC taxi trip data site no longer stores the files in csv format. I had to download it from the course website and run gunzip.

!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
!gunzip fhvhv_tripdata_2021-01.csv.gz
Enter fullscreen mode Exit fullscreen mode

Then I could proceed with the video.

When spark reads a csv file, it doesn't guess about the data types. Everything is a string. In the video, the instructor read the file into pandas first, and then read the pandas dataframe into parquet. However, when I did that, I had an error: AttributeError: 'DataFrame' object has no attribute 'iteritems' The problem was, we're using an older version of Spark, but not pandas, and they're out of sync. I tried getting an older version of pandas, but I still had an error. So I just ended up skipping that step and using the schema that the instructor had already created.

Next we had to partition the Spark dataframe. The reason is, Spark has multiple engines to process the data. In this case, it has six. But if we only have one thing to process, five of them remain idle. So we need to partition the data to be a multiple of six. The instructor chose 24. Partitioning is a "lazy" command, which means that it doesn't actually partition anything until you do something. So we called "write", and then the partitioning happened.

df = df.repartition(24)
df.write.parquet('fhvhv/2021/01/')
Enter fullscreen mode Exit fullscreen mode

Top comments (0)