Study Notes 5.3.1 - on Spark/PySpark
These notes cover the basics and some intermediate topics discussed in DE Zoomcamp 5.3.1 – First Look at Spark/PySpark. Extra details on schema definition, partitioning, and best practices for a well-rounded understanding.
1. Introduction to Spark and PySpark
-
Apache Spark Overview:
- Spark is a fast, in-memory distributed computing framework designed for large-scale data processing.
- PySpark is the Python API for Spark, providing a way to work with Spark’s distributed DataFrame and SQL functionalities using Python.
-
Why Spark?
- It is designed to handle high volume data efficiently.
- Supports a range of operations—from data ingestion and processing to machine learning.
- Emphasizes parallel execution through clusters (executors).
-
Key Concepts:
- SparkSession: The main entry point to interact with Spark. It is used for reading data, processing, and writing output.
-
Lazy Evaluation: Transformations (e.g., reading files, repartitioning) are lazy—they are not executed until an action (e.g.,
show()
,write()
) is called.
2. Setting Up the Environment
-
Remote Execution:
- The transcript demonstrates connecting via SSH to a remote machine.
- Jupyter Notebooks are used to interact with PySpark code. Remember to configure port forwarding (e.g., port 4040 for Spark UI) correctly.
-
Development Tools:
- Terminal commands are used to manage files and run Jupyter.
- For Windows users, alternatives like Git Bash or MinGW can help mimic Linux command behavior.
3. Reading CSV Files with PySpark
-
Reading Data:
- Use the
spark.read.csv()
method to load CSV files into a Spark DataFrame. -
Example code snippet:
df = spark.read.csv("path/to/file.csv", header=True) df.show()
- Use the
-
Type Inference in Spark vs. Pandas:
- Spark: Does not automatically infer data types; by default, every column is read as a string.
- Pandas: Has basic type inference but might not recognize date/time fields accurately.
- Implication: When accuracy is critical, explicitly defining the schema is recommended.
-
Defining Schema:
- Use
pyspark.sql.types
(e.g.,StructType
,StructField
,StringType
,IntegerType
,TimestampType
) to create a schema. -
Example schema definition:
from pyspark.sql import types as T schema = T.StructType([ T.StructField("pickup_datetime", T.TimestampType(), True), T.StructField("dropoff_datetime", T.TimestampType(), True), T.StructField("pickup_location_id", T.IntegerType(), True), T.StructField("dropoff_location_id", T.IntegerType(), True), T.StructField("sr_flag", T.StringType(), True), # Add more fields as needed ]) df = spark.read.csv("path/to/file.csv", header=True, schema=schema) df.show(10)
Tip: Always check your data and use Pandas or manual inspection if needed to decide on the correct types.
- Use
4. Converting Pandas DataFrame to Spark DataFrame
-
When and Why:
- Sometimes you might load a small sample with Pandas (which infers types) and then convert to a Spark DataFrame for distributed processing.
- Use
spark.createDataFrame(pandas_df, schema)
to convert, ensuring the Spark DataFrame has the desired data types.
-
Benefits:
- Allows leveraging Pandas’ flexibility for type detection on smaller subsets.
- Transitioning to Spark helps when the dataset size exceeds the memory limits of a single machine.
5. Partitioning in Spark
-
Understanding Partitions:
- In Spark, a partition is a chunk of data that can be processed independently on an executor.
- The number of partitions affects parallelism; more partitions allow for better utilization of available executors.
-
Practical Example:
- A large CSV file might be read as one single partition, causing only one executor to process it. This creates a bottleneck.
-
Repartitioning:
- The
repartition(n)
method can be used to change the number of partitions. -
Example:
df_repartitioned = df.repartition(24)
Note: Repartitioning is a lazy operation. It does not occur until an action (such as writing data) is executed.
- The
-
Cluster and File Distribution:
- Files stored in a data lake (or cloud storage) might be split into multiple partitions automatically if there are many files.
- Balancing the number of partitions with available executor resources is key for optimal performance.
6. Writing Data in Parquet Format
-
Why Parquet?
- Parquet is a columnar storage format that offers efficient data compression and encoding schemes.
- It is optimized for both storage size and query performance.
-
Process Overview:
-
After reading and processing the data, the DataFrame is written out in Parquet format:
df_repartitioned.write.parquet("path/to/output_folder")
During the write operation, Spark may repartition the data further to meet the specified or default number of partitions.
-
-
Benefits:
- Smaller file size (often 4x reduction compared to CSV).
- Faster read times due to the columnar structure.
- Better integration with many big data tools.
-
Additional Note:
- Spark writes a _SUCCESS file (or similar marker) to indicate job completion. If rerunning, ensure you manage overwrites appropriately (e.g., using
.mode("overwrite")
).
- Spark writes a _SUCCESS file (or similar marker) to indicate job completion. If rerunning, ensure you manage overwrites appropriately (e.g., using
7. Monitoring with the Spark Master UI
-
Overview of the UI:
- The Spark Master UI (typically accessible on port 4040) provides real-time monitoring of jobs, stages, and tasks.
- It shows information about:
- Active and completed jobs.
- Task execution details, including the number of partitions processed.
- Executor usage and data shuffling (exchange) during repartitioning.
-
Usage:
- Refreshing the UI can help you observe the progress of a job, see how many tasks are running, and troubleshoot potential performance issues.
8. Additional Information and Best Practices
8.1 Lazy Evaluation
-
Concept:
- Operations like reading data, filtering, and repartitioning are lazily evaluated.
- Actions such as
show()
,collect()
, or writing data trigger the actual execution.
-
Why It Matters:
- Optimizes the execution plan and can combine multiple transformations into a single job.
- Helps in debugging and performance tuning.
8.2 Memory Management and Type Selection
-
Choosing Data Types:
- Be mindful of using the most efficient data types (e.g.,
IntegerType
overLongType
if the range allows) to save memory.
- Be mindful of using the most efficient data types (e.g.,
-
Schema Enforcement:
- Explicitly defining the schema not only speeds up reading but also avoids errors later in the processing pipeline.
8.3 Differences Between Spark DataFrame and Pandas DataFrame
-
Spark DataFrame:
- Distributed, supports lazy evaluation, and is ideal for processing large datasets.
- Operations are optimized across a cluster of machines.
-
Pandas DataFrame:
- In-memory and more suitable for smaller datasets.
- Rich API for data manipulation but not optimized for scale.
8.4 Troubleshooting Common Issues
-
Port Forwarding:
- Ensure correct configuration (e.g., port 4040 for the Spark UI).
-
File Overwrite Errors:
-
If a write operation complains about existing paths, consider using an overwrite mode:
df.write.mode("overwrite").parquet("path/to/output_folder")
-
-
Data Type Discrepancies:
- Use Pandas to inspect a sample when unsure about the correct schema, then enforce it in Spark.
9. Conclusion
-
Summary:
- You’ve learned how to read large CSV files using PySpark, define schemas explicitly, and convert data between Pandas and Spark DataFrames.
- The importance of partitioning for parallel processing and the efficiency gains from writing in Parquet format were emphasized.
- The Spark Master UI is a valuable tool for monitoring the execution of your jobs.
-
Next Steps:
- Explore further topics such as Spark DataFrame operations, advanced optimizations, and cluster configuration.
- Practice by running sample jobs and observing how partitioning and lazy evaluation affect performance.
Study Notes 5.3.2 - Spark DataFrames
1. Introduction to Spark DataFrames
-
Definition:
A Spark DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database or a Pandas DataFrame. They are part of the higher-level Spark SQL API, which makes data processing more expressive and concise.
-
Why Use DataFrames?
- Ease of Use: Offers an intuitive API for data manipulation using both SQL-like operations and Python/Scala/Java code.
- Optimizations: Leverages Spark’s Catalyst optimizer to optimize query plans automatically.
- Scalability: Works well on large-scale data processing tasks by distributing data and computation across clusters.
2. Reading and Writing Data
-
File Formats:
-
CSV:
- Easy to read but does not store schema information.
- Typically larger in size since values are stored as text.
-
Parquet:
- A columnar storage format that embeds schema information.
- More efficient storage (e.g., uses 4 bytes per value for integers) and allows for faster querying due to predicate pushdown.
-
CSV:
-
Example (from Transcript):
Using a previously saved Parquet file that automatically includes the schema, so there’s no need to specify it again when reading the data.
-
Key Takeaway:
Parquet files often yield better performance and reduced storage compared to CSV files because they store metadata (schema) and allow for efficient compression and encoding.
3. Transformations vs. Actions
-
Transformations:
- Operations that define a new DataFrame from an existing one (e.g.,
select
,filter
,withColumn
). - Lazy Evaluation: They are not executed immediately. Spark builds up a DAG (Directed Acyclic Graph) of transformations.
-
Examples:
df.select("column1", "column2")
df.filter(df.license_number == "XYZ")
- Operations that define a new DataFrame from an existing one (e.g.,
-
Actions:
- Operations that trigger the execution of the DAG and return a result to the driver program.
- Eager Execution: When an action is called, all the preceding transformations are computed.
-
Examples:
-
show()
,collect()
,take(n)
,write.csv()
,write.parquet()
-
-
Visualizing the Concept:
Imagine building a recipe (transformations) that isn’t cooked until you actually serve it (action). This lazy evaluation allows Spark to optimize the whole chain of operations before executing them.
4. Basic DataFrame Operations
-
Selecting Columns:
Use
select()
to pick specific columns from a DataFrame.
df_selected = df.select("pickup_datetime", "dropoff_datetime", "pickup_location_id", "dropoff_location_id")
-
Filtering Rows:
Use
filter()
(orwhere()
) to return rows that meet specific conditions.
df_filtered = df.filter(df.license_number == "XYZ")
-
Adding or Modifying Columns:
Use
withColumn()
along with built-in functions to add new columns or modify existing ones.-
Example: Adding a date column by converting a datetime:
from pyspark.sql import functions as f df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
Note on Overwriting:
If you provide a column name that already exists, Spark will replace the old column with the new one. This can change semantics (e.g., from datetime to date).
-
-
Grouping and Aggregation:
Although not detailed in the transcript, you can use
groupBy()
to perform aggregations like sum, count, or average on DataFrame columns.
5. Spark SQL Functions
-
Built-In Functions:
Spark comes with a rich library of SQL functions that simplify data manipulation. These functions can be imported using:
from pyspark.sql import functions as f
-
Usage Example:
The functionto_date()
is used to extract the date part from a datetime column:
df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
Exploring Functions:
Typingf.
in an interactive environment (like PySpark shell or a notebook) typically shows the list of available functions.
6. User Defined Functions (UDFs)
-
When to Use UDFs:
Sometimes you need to implement custom business logic that isn’t available through built-in functions or is too complex to express in SQL. UDFs allow you to write custom Python functions and apply them to DataFrame columns.
-
Creating a UDF:
-
Step 1: Define a standard Python function that encapsulates your custom logic.
def crazy_stuff(dispatching_base): # Custom logic: for example, check divisibility and return a formatted string base = int(dispatching_base[1:]) # assume the first character is not numeric if base % 7 == 0: return f"s/{base}" else: return f"e/{base}"
-
Step 2: Convert the Python function to a Spark UDF and specify the return type (default is StringType unless specified otherwise).
from pyspark.sql.types import StringType crazy_stuff_udf = f.udf(crazy_stuff, StringType())
-
Step 3: Apply the UDF to a DataFrame column:
df = df.withColumn("waste_id", crazy_stuff_udf("dispatching_base"))
-
-
Benefits:
- Testability: You can test your Python functions separately (unit tests) before integrating them into Spark.
- Flexibility: Complex business rules can be easily implemented in Python rather than wrestling with complex SQL expressions.
-
Considerations:
UDFs may introduce performance overhead compared to native Spark SQL functions. When possible, try to use built-in functions.
7. SQL vs. DataFrame API
-
Using SQL in Spark:
- Spark allows you to run SQL queries directly on DataFrames by creating temporary views.
-
Example:
df.createOrReplaceTempView("rides") spark.sql("SELECT pickup_date, COUNT(*) FROM rides GROUP BY pickup_date").show()
-
When to Use Which:
-
SQL:
- Preferred for simple aggregations, joins, or when you need a familiar SQL interface.
- Useful for analysts who are comfortable with SQL syntax.
-
DataFrame API (with Python/Scala/Java):
- Offers greater flexibility for complex transformations.
- Easier to integrate with other programming logic and unit tests (especially in Python).
-
SQL:
-
Integration Benefits:
Spark’s architecture allows you to mix SQL and DataFrame API operations in the same workflow, giving you the best of both worlds.
8. Additional Concepts and Best Practices
-
Lazy Evaluation and DAG Optimization:
- Spark delays execution of transformations until an action is called. This allows Spark to optimize the entire chain of operations (using the Catalyst optimizer) before executing them.
- Understanding this helps in debugging performance issues and ensuring that you’re not inadvertently triggering multiple jobs.
-
Partitioning:
- Data is split into partitions across the cluster.
- Good partitioning strategy is key to performance. Over-partitioning or under-partitioning can affect job speed.
-
Caching and Persistence:
- Use caching (
df.cache()
) to store frequently accessed DataFrames in memory, reducing computation time for iterative operations.
- Use caching (
-
Testing and Code Quality:
- Unit test your transformation logic and UDFs independently.
- Keep business logic modular so that it can be maintained, reused, and easily tested.
-
Monitoring:
- Use the Spark UI (accessed via the Spark master web UI) to monitor job execution, inspect DAGs, and diagnose performance issues.
-
Performance Considerations:
- Prefer native Spark functions over UDFs when possible.
- Be mindful of data serialization, shuffling, and network I/O as these can become performance bottlenecks.
-
Use Cases in Data Engineering:
- ETL processes: Reading from various sources, transforming data, and writing to efficient storage formats.
- Data Cleaning: Applying transformations and filtering to cleanse raw data.
- Machine Learning: Preprocessing data before feeding it into ML models, often mixing SQL with Python-based UDFs for feature engineering.
9. Summary
- Spark DataFrames provide a powerful and flexible way to process large-scale data.
- Lazy evaluation allows Spark to optimize the execution plan, differentiating between transformations (lazy) and actions (eager).
- Built-in SQL functions and DataFrame API offer a rich toolset for common data manipulation tasks.
- UDFs extend Spark’s functionality by letting you apply custom Python logic when necessary.
- Both SQL and DataFrame API have their strengths, and Spark lets you combine them in a seamless workflow.
10. Further Reading and Resources
- Official Apache Spark Documentation:Spark SQL, DataFrames and Datasets Guide
- DE Zoomcamp Materials: Additional videos and tutorials on Spark and data engineering best practices.
- Community Tutorials: Blogs, courses, and GitHub repositories related to Spark best practices for data engineers.
Top comments (0)