DEV Community

Cover image for Study Notes 5.3.1-2 First Look at Spark/PySpark & Spark Dataframes
Pizofreude
Pizofreude

Posted on

Study Notes 5.3.1-2 First Look at Spark/PySpark & Spark Dataframes

Study Notes 5.3.1 - on Spark/PySpark

These notes cover the basics and some intermediate topics discussed in DE Zoomcamp 5.3.1 – First Look at Spark/PySpark. Extra details on schema definition, partitioning, and best practices for a well-rounded understanding.


1. Introduction to Spark and PySpark

  • Apache Spark Overview:
    • Spark is a fast, in-memory distributed computing framework designed for large-scale data processing.
    • PySpark is the Python API for Spark, providing a way to work with Spark’s distributed DataFrame and SQL functionalities using Python.
  • Why Spark?
    • It is designed to handle high volume data efficiently.
    • Supports a range of operations—from data ingestion and processing to machine learning.
    • Emphasizes parallel execution through clusters (executors).
  • Key Concepts:
    • SparkSession: The main entry point to interact with Spark. It is used for reading data, processing, and writing output.
    • Lazy Evaluation: Transformations (e.g., reading files, repartitioning) are lazy—they are not executed until an action (e.g., show(), write()) is called.

2. Setting Up the Environment

  • Remote Execution:
    • The transcript demonstrates connecting via SSH to a remote machine.
    • Jupyter Notebooks are used to interact with PySpark code. Remember to configure port forwarding (e.g., port 4040 for Spark UI) correctly.
  • Development Tools:
    • Terminal commands are used to manage files and run Jupyter.
    • For Windows users, alternatives like Git Bash or MinGW can help mimic Linux command behavior.

3. Reading CSV Files with PySpark

  • Reading Data:

    • Use the spark.read.csv() method to load CSV files into a Spark DataFrame.
    • Example code snippet:

      df = spark.read.csv("path/to/file.csv", header=True)
      df.show()
      
      
  • Type Inference in Spark vs. Pandas:

    • Spark: Does not automatically infer data types; by default, every column is read as a string.
    • Pandas: Has basic type inference but might not recognize date/time fields accurately.
    • Implication: When accuracy is critical, explicitly defining the schema is recommended.
  • Defining Schema:

    • Use pyspark.sql.types (e.g., StructType, StructField, StringType, IntegerType, TimestampType) to create a schema.
    • Example schema definition:

      from pyspark.sql import types as T
      
      schema = T.StructType([
          T.StructField("pickup_datetime", T.TimestampType(), True),
          T.StructField("dropoff_datetime", T.TimestampType(), True),
          T.StructField("pickup_location_id", T.IntegerType(), True),
          T.StructField("dropoff_location_id", T.IntegerType(), True),
          T.StructField("sr_flag", T.StringType(), True),
          # Add more fields as needed
      ])
      df = spark.read.csv("path/to/file.csv", header=True, schema=schema)
      df.show(10)
      
      
    • Tip: Always check your data and use Pandas or manual inspection if needed to decide on the correct types.


4. Converting Pandas DataFrame to Spark DataFrame

  • When and Why:
    • Sometimes you might load a small sample with Pandas (which infers types) and then convert to a Spark DataFrame for distributed processing.
    • Use spark.createDataFrame(pandas_df, schema) to convert, ensuring the Spark DataFrame has the desired data types.
  • Benefits:
    • Allows leveraging Pandas’ flexibility for type detection on smaller subsets.
    • Transitioning to Spark helps when the dataset size exceeds the memory limits of a single machine.

5. Partitioning in Spark

  • Understanding Partitions:
    • In Spark, a partition is a chunk of data that can be processed independently on an executor.
    • The number of partitions affects parallelism; more partitions allow for better utilization of available executors.
  • Practical Example:
    • A large CSV file might be read as one single partition, causing only one executor to process it. This creates a bottleneck.
  • Repartitioning:

    • The repartition(n) method can be used to change the number of partitions.
    • Example:

      df_repartitioned = df.repartition(24)
      
      
    • Note: Repartitioning is a lazy operation. It does not occur until an action (such as writing data) is executed.

  • Cluster and File Distribution:

    • Files stored in a data lake (or cloud storage) might be split into multiple partitions automatically if there are many files.
    • Balancing the number of partitions with available executor resources is key for optimal performance.

6. Writing Data in Parquet Format

  • Why Parquet?
    • Parquet is a columnar storage format that offers efficient data compression and encoding schemes.
    • It is optimized for both storage size and query performance.
  • Process Overview:

    • After reading and processing the data, the DataFrame is written out in Parquet format:

      df_repartitioned.write.parquet("path/to/output_folder")
      
      
    • During the write operation, Spark may repartition the data further to meet the specified or default number of partitions.

  • Benefits:

    • Smaller file size (often 4x reduction compared to CSV).
    • Faster read times due to the columnar structure.
    • Better integration with many big data tools.
  • Additional Note:

    • Spark writes a _SUCCESS file (or similar marker) to indicate job completion. If rerunning, ensure you manage overwrites appropriately (e.g., using .mode("overwrite")).

7. Monitoring with the Spark Master UI

  • Overview of the UI:
    • The Spark Master UI (typically accessible on port 4040) provides real-time monitoring of jobs, stages, and tasks.
    • It shows information about:
      • Active and completed jobs.
      • Task execution details, including the number of partitions processed.
      • Executor usage and data shuffling (exchange) during repartitioning.
  • Usage:
    • Refreshing the UI can help you observe the progress of a job, see how many tasks are running, and troubleshoot potential performance issues.

8. Additional Information and Best Practices

8.1 Lazy Evaluation

  • Concept:
    • Operations like reading data, filtering, and repartitioning are lazily evaluated.
    • Actions such as show(), collect(), or writing data trigger the actual execution.
  • Why It Matters:
    • Optimizes the execution plan and can combine multiple transformations into a single job.
    • Helps in debugging and performance tuning.

8.2 Memory Management and Type Selection

  • Choosing Data Types:
    • Be mindful of using the most efficient data types (e.g., IntegerType over LongType if the range allows) to save memory.
  • Schema Enforcement:
    • Explicitly defining the schema not only speeds up reading but also avoids errors later in the processing pipeline.

8.3 Differences Between Spark DataFrame and Pandas DataFrame

  • Spark DataFrame:
    • Distributed, supports lazy evaluation, and is ideal for processing large datasets.
    • Operations are optimized across a cluster of machines.
  • Pandas DataFrame:
    • In-memory and more suitable for smaller datasets.
    • Rich API for data manipulation but not optimized for scale.

8.4 Troubleshooting Common Issues

  • Port Forwarding:
    • Ensure correct configuration (e.g., port 4040 for the Spark UI).
  • File Overwrite Errors:

    • If a write operation complains about existing paths, consider using an overwrite mode:

      df.write.mode("overwrite").parquet("path/to/output_folder")
      
      
  • Data Type Discrepancies:

    • Use Pandas to inspect a sample when unsure about the correct schema, then enforce it in Spark.

9. Conclusion

  • Summary:
    • You’ve learned how to read large CSV files using PySpark, define schemas explicitly, and convert data between Pandas and Spark DataFrames.
    • The importance of partitioning for parallel processing and the efficiency gains from writing in Parquet format were emphasized.
    • The Spark Master UI is a valuable tool for monitoring the execution of your jobs.
  • Next Steps:
    • Explore further topics such as Spark DataFrame operations, advanced optimizations, and cluster configuration.
    • Practice by running sample jobs and observing how partitioning and lazy evaluation affect performance.

Study Notes 5.3.2 - Spark DataFrames

1. Introduction to Spark DataFrames

  • Definition:

    A Spark DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database or a Pandas DataFrame. They are part of the higher-level Spark SQL API, which makes data processing more expressive and concise.

  • Why Use DataFrames?

    • Ease of Use: Offers an intuitive API for data manipulation using both SQL-like operations and Python/Scala/Java code.
    • Optimizations: Leverages Spark’s Catalyst optimizer to optimize query plans automatically.
    • Scalability: Works well on large-scale data processing tasks by distributing data and computation across clusters.

2. Reading and Writing Data

  • File Formats:
    • CSV:
      • Easy to read but does not store schema information.
      • Typically larger in size since values are stored as text.
    • Parquet:
      • A columnar storage format that embeds schema information.
      • More efficient storage (e.g., uses 4 bytes per value for integers) and allows for faster querying due to predicate pushdown.
  • Example (from Transcript):

    Using a previously saved Parquet file that automatically includes the schema, so there’s no need to specify it again when reading the data.

  • Key Takeaway:

    Parquet files often yield better performance and reduced storage compared to CSV files because they store metadata (schema) and allow for efficient compression and encoding.


3. Transformations vs. Actions

  • Transformations:
    • Operations that define a new DataFrame from an existing one (e.g., select, filter, withColumn).
    • Lazy Evaluation: They are not executed immediately. Spark builds up a DAG (Directed Acyclic Graph) of transformations.
    • Examples:
      • df.select("column1", "column2")
      • df.filter(df.license_number == "XYZ")
  • Actions:
    • Operations that trigger the execution of the DAG and return a result to the driver program.
    • Eager Execution: When an action is called, all the preceding transformations are computed.
    • Examples:
      • show(), collect(), take(n), write.csv(), write.parquet()
  • Visualizing the Concept:

    Imagine building a recipe (transformations) that isn’t cooked until you actually serve it (action). This lazy evaluation allows Spark to optimize the whole chain of operations before executing them.


4. Basic DataFrame Operations

  • Selecting Columns:

    Use select() to pick specific columns from a DataFrame.

    df_selected = df.select("pickup_datetime", "dropoff_datetime", "pickup_location_id", "dropoff_location_id")
    
    
  • Filtering Rows:

    Use filter() (or where()) to return rows that meet specific conditions.

    df_filtered = df.filter(df.license_number == "XYZ")
    
    
  • Adding or Modifying Columns:

    Use withColumn() along with built-in functions to add new columns or modify existing ones.

    • Example: Adding a date column by converting a datetime:

      from pyspark.sql import functions as f
      df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
      
      
    • Note on Overwriting:
      If you provide a column name that already exists, Spark will replace the old column with the new one. This can change semantics (e.g., from datetime to date).

  • Grouping and Aggregation:

    Although not detailed in the transcript, you can use groupBy() to perform aggregations like sum, count, or average on DataFrame columns.


5. Spark SQL Functions

  • Built-In Functions:
    Spark comes with a rich library of SQL functions that simplify data manipulation. These functions can be imported using:

    from pyspark.sql import functions as f
    
    
  • Usage Example:
    The function to_date() is used to extract the date part from a datetime column:

    df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
    
    
  • Exploring Functions:
    Typing f. in an interactive environment (like PySpark shell or a notebook) typically shows the list of available functions.


6. User Defined Functions (UDFs)

  • When to Use UDFs:

    Sometimes you need to implement custom business logic that isn’t available through built-in functions or is too complex to express in SQL. UDFs allow you to write custom Python functions and apply them to DataFrame columns.

  • Creating a UDF:

    • Step 1: Define a standard Python function that encapsulates your custom logic.

      def crazy_stuff(dispatching_base):
          # Custom logic: for example, check divisibility and return a formatted string
          base = int(dispatching_base[1:])  # assume the first character is not numeric
          if base % 7 == 0:
              return f"s/{base}"
          else:
              return f"e/{base}"
      
      
    • Step 2: Convert the Python function to a Spark UDF and specify the return type (default is StringType unless specified otherwise).

      from pyspark.sql.types import StringType
      crazy_stuff_udf = f.udf(crazy_stuff, StringType())
      
      
    • Step 3: Apply the UDF to a DataFrame column:

      df = df.withColumn("waste_id", crazy_stuff_udf("dispatching_base"))
      
      
  • Benefits:

    • Testability: You can test your Python functions separately (unit tests) before integrating them into Spark.
    • Flexibility: Complex business rules can be easily implemented in Python rather than wrestling with complex SQL expressions.
  • Considerations:

    UDFs may introduce performance overhead compared to native Spark SQL functions. When possible, try to use built-in functions.


7. SQL vs. DataFrame API

  • Using SQL in Spark:

    • Spark allows you to run SQL queries directly on DataFrames by creating temporary views.
    • Example:

      df.createOrReplaceTempView("rides")
      spark.sql("SELECT pickup_date, COUNT(*) FROM rides GROUP BY pickup_date").show()
      
      
  • When to Use Which:

    • SQL:
      • Preferred for simple aggregations, joins, or when you need a familiar SQL interface.
      • Useful for analysts who are comfortable with SQL syntax.
    • DataFrame API (with Python/Scala/Java):
      • Offers greater flexibility for complex transformations.
      • Easier to integrate with other programming logic and unit tests (especially in Python).
  • Integration Benefits:

    Spark’s architecture allows you to mix SQL and DataFrame API operations in the same workflow, giving you the best of both worlds.


8. Additional Concepts and Best Practices

  • Lazy Evaluation and DAG Optimization:
    • Spark delays execution of transformations until an action is called. This allows Spark to optimize the entire chain of operations (using the Catalyst optimizer) before executing them.
    • Understanding this helps in debugging performance issues and ensuring that you’re not inadvertently triggering multiple jobs.
  • Partitioning:
    • Data is split into partitions across the cluster.
    • Good partitioning strategy is key to performance. Over-partitioning or under-partitioning can affect job speed.
  • Caching and Persistence:
    • Use caching (df.cache()) to store frequently accessed DataFrames in memory, reducing computation time for iterative operations.
  • Testing and Code Quality:
    • Unit test your transformation logic and UDFs independently.
    • Keep business logic modular so that it can be maintained, reused, and easily tested.
  • Monitoring:
    • Use the Spark UI (accessed via the Spark master web UI) to monitor job execution, inspect DAGs, and diagnose performance issues.
  • Performance Considerations:
    • Prefer native Spark functions over UDFs when possible.
    • Be mindful of data serialization, shuffling, and network I/O as these can become performance bottlenecks.
  • Use Cases in Data Engineering:
    • ETL processes: Reading from various sources, transforming data, and writing to efficient storage formats.
    • Data Cleaning: Applying transformations and filtering to cleanse raw data.
    • Machine Learning: Preprocessing data before feeding it into ML models, often mixing SQL with Python-based UDFs for feature engineering.

9. Summary

  • Spark DataFrames provide a powerful and flexible way to process large-scale data.
  • Lazy evaluation allows Spark to optimize the execution plan, differentiating between transformations (lazy) and actions (eager).
  • Built-in SQL functions and DataFrame API offer a rich toolset for common data manipulation tasks.
  • UDFs extend Spark’s functionality by letting you apply custom Python logic when necessary.
  • Both SQL and DataFrame API have their strengths, and Spark lets you combine them in a seamless workflow.

10. Further Reading and Resources

  • Official Apache Spark Documentation:Spark SQL, DataFrames and Datasets Guide
  • DE Zoomcamp Materials: Additional videos and tutorials on Spark and data engineering best practices.
  • Community Tutorials: Blogs, courses, and GitHub repositories related to Spark best practices for data engineers.

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

Top comments (0)

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

Rather than just generating snippets, our agents understand your entire project context, can make decisions, use tools, and carry out tasks autonomously.

Read full post

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay