DEV Community

Vinicius Fagundes
Vinicius Fagundes

Posted on

DataFrames & SQL in Databricks: Reading, Writing, and Transforming Data

DataFrames & SQL in Databricks: Reading, Writing, and Transforming Data

This is where things get real.

So far we've set up our environment, understood clusters, learned how Spark works under the hood, and connected to cloud storage. Now we actually touch data.

In this article you'll learn how to read data in multiple formats, transform it using PySpark and SQL, and write results back to storage. These are the operations you'll use every single day as a data engineer.


What is a DataFrame in Spark?

A DataFrame is Spark's primary data structure — a distributed table with named columns and a defined schema.

If you've used Pandas, the concept is familiar. But there are critical differences:

Pandas DataFrame Spark DataFrame
Where data lives Single machine RAM Distributed across cluster
Data size limit Your machine's RAM Practically unlimited
Execution Immediate Lazy (runs on action)
API pandas pyspark.sql
Best for < 1GB, local work GBs to PBs, distributed work

In Databricks, every DataFrame is backed by Spark. When you call spark.read..., you get a Spark DataFrame — not a Pandas one.

💡 You can convert between them: pandas_df = spark_df.toPandas() — but be careful. This pulls all data to the driver node. Only do this on small datasets.


Reading Data

Reading CSV

# Basic read
df = spark.read.csv("/mnt/raw/sales.csv", header=True, inferSchema=True)

# Better: explicit schema (faster, safer)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

schema = StructType([
    StructField("order_id",    IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product",     StringType(),  True),
    StructField("amount",      DoubleType(),  True),
    StructField("order_date",  DateType(),    True),
    StructField("region",      StringType(),  True)
])

df = spark.read.csv("/mnt/raw/sales.csv", schema=schema, header=True)
df.printSchema()
df.show(5)
Enter fullscreen mode Exit fullscreen mode

Reading JSON

df = spark.read.json("/mnt/raw/events/")

# For nested JSON, Spark auto-infers the structure
df.printSchema()

# Access nested fields using dot notation
df.select("user.id", "user.email", "event_type").show()
Enter fullscreen mode Exit fullscreen mode

Reading Parquet

# Parquet preserves schema — no need to define it
df = spark.read.parquet("/mnt/processed/transactions/")

# Read a partitioned dataset — Spark reads only relevant partitions
df = spark.read.parquet("/mnt/processed/sales/") \
    .filter("year = 2024 AND month = 1")
Enter fullscreen mode Exit fullscreen mode

Reading Delta Tables

# By path
df = spark.read.format("delta").load("/mnt/processed/sales/")

# By table name (if registered in the catalog)
df = spark.read.table("sales_silver")
Enter fullscreen mode Exit fullscreen mode

Common DataFrame Transformations

select — Choose Columns

# Select specific columns
df.select("order_id", "product", "amount").show()

# Rename on the fly
from pyspark.sql.functions import col

df.select(
    col("order_id").alias("id"),
    col("amount").alias("revenue"),
    col("order_date").alias("date")
).show()
Enter fullscreen mode Exit fullscreen mode

filter / where — Filter Rows

# Filter with column expression
df.filter(col("amount") > 1000).show()

# Filter with SQL string (often cleaner)
df.filter("amount > 1000 AND region = 'North'").show()

# .where() is an alias for .filter()
df.where("region = 'South'").show()

# Multiple conditions
df.filter((col("amount") > 500) & (col("region").isin(["North", "South"]))).show()
Enter fullscreen mode Exit fullscreen mode

withColumn — Add or Modify Columns

from pyspark.sql.functions import col, upper, round, when, lit

# Add a new column
df = df.withColumn("amount_eur", col("amount") * 0.92)

# Modify an existing column
df = df.withColumn("product", upper(col("product")))

# Round a numeric column
df = df.withColumn("amount", round(col("amount"), 2))

# Conditional column (like IF/CASE)
df = df.withColumn("tier",
    when(col("amount") >= 5000, "Premium")
    .when(col("amount") >= 1000, "Standard")
    .otherwise("Basic")
)

# Add a constant column
df = df.withColumn("currency", lit("USD"))
Enter fullscreen mode Exit fullscreen mode

groupBy + agg — Aggregate Data

from pyspark.sql.functions import sum, avg, count, max, min, countDistinct

# Total revenue by region
df.groupBy("region") \
  .agg(sum("amount").alias("total_revenue")) \
  .show()

# Multiple aggregations at once
df.groupBy("region", "product") \
  .agg(
      sum("amount").alias("total_revenue"),
      avg("amount").alias("avg_order_value"),
      count("order_id").alias("total_orders"),
      countDistinct("customer_id").alias("unique_customers")
  ) \
  .orderBy("total_revenue", ascending=False) \
  .show()
Enter fullscreen mode Exit fullscreen mode

join — Combine DataFrames

# Load a second DataFrame
customers = spark.read.table("customers_silver")

# Inner join (default)
result = df.join(customers, on="customer_id", how="inner")

# Left join
result = df.join(customers, on="customer_id", how="left")

# Join on multiple columns
result = df.join(
    customers,
    on=["customer_id", "region"],
    how="inner"
)

# Join on different column names
result = df.join(
    customers,
    df.customer_id == customers.id,
    how="inner"
).drop(customers.id)  # drop duplicate column
Enter fullscreen mode Exit fullscreen mode

⚠️ Joins are expensive in Spark — they trigger a shuffle (data redistribution across workers). Avoid joining unnecessarily, and filter before joining when possible.

dropDuplicates — Remove Duplicate Rows

# Drop fully duplicate rows
df = df.dropDuplicates()

# Drop duplicates based on specific columns (keep first occurrence)
df = df.dropDuplicates(["order_id"])
Enter fullscreen mode Exit fullscreen mode

dropna / fillna — Handle Nulls

# Drop rows with any null value
df = df.dropna()

# Drop rows where specific columns are null
df = df.dropna(subset=["order_id", "amount"])

# Fill nulls with a default value
df = df.fillna({"amount": 0.0, "region": "Unknown"})
Enter fullscreen mode Exit fullscreen mode

SQL in Databricks

Everything you can do with PySpark DataFrames, you can also do with SQL. In Databricks, SQL is a first-class citizen.

Creating a Temp View

To run SQL against a DataFrame, register it as a temporary view:

df.createOrReplaceTempView("sales")

# Now query it with SQL
result = spark.sql("""
    SELECT 
        region,
        product,
        SUM(amount)        AS total_revenue,
        COUNT(order_id)    AS total_orders,
        AVG(amount)        AS avg_order_value
    FROM sales
    WHERE order_date >= '2024-01-01'
    GROUP BY region, product
    ORDER BY total_revenue DESC
""")

result.show()
Enter fullscreen mode Exit fullscreen mode

Using the %sql Magic Command

In notebooks, you can switch an entire cell to SQL mode:

%sql

SELECT 
    region,
    DATE_TRUNC('month', order_date) AS month,
    SUM(amount)                      AS monthly_revenue
FROM sales
WHERE year(order_date) = 2024
GROUP BY region, month
ORDER BY month, monthly_revenue DESC
Enter fullscreen mode Exit fullscreen mode

The result renders automatically as an interactive table — you can even switch to a chart view directly in the notebook.

Creating Permanent Tables

Temp views disappear when the session ends. For persistent tables, use CREATE TABLE or save as Delta:

%sql

CREATE TABLE IF NOT EXISTS gold.monthly_revenue
USING DELTA
AS
SELECT 
    region,
    DATE_TRUNC('month', order_date) AS month,
    SUM(amount) AS total_revenue
FROM sales_silver
GROUP BY region, month
Enter fullscreen mode Exit fullscreen mode

PySpark vs SQL: When to Use Which

Both PySpark and SQL produce the same result in Databricks — Spark executes them through the same engine. So how do you choose?

Situation Recommendation
Simple aggregations and filters SQL — more readable
Complex multi-step transformations PySpark — easier to chain and debug
Conditional logic (when/otherwise) PySpark — cleaner than nested CASE WHEN
Window functions Either — SQL is often cleaner
Building reusable pipeline functions PySpark — easier to parameterize
Ad-hoc exploration SQL — fast to write and results display nicely
Working with nested/complex types PySpark — more flexible

In practice, experienced data engineers mix both freely. Use whatever makes the code clearest.


Writing Data Back to Storage

After transforming your data, you need to write it somewhere.

Write as Parquet

df.write \
  .mode("overwrite") \
  .parquet("/mnt/processed/sales/")
Enter fullscreen mode Exit fullscreen mode

Write as Delta (Recommended)

df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/mnt/processed/sales/")
Enter fullscreen mode Exit fullscreen mode

Write Modes

Mode Behavior
overwrite Replace existing data entirely
append Add new data to existing data
ignore Do nothing if data already exists
error (default) Raise an error if data already exists

Write with Partitioning

df.write \
  .format("delta") \
  .mode("overwrite") \
  .partitionBy("year", "month") \
  .save("/mnt/processed/sales/")
Enter fullscreen mode Exit fullscreen mode

Save as a Table

df.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable("sales_silver")
Enter fullscreen mode Exit fullscreen mode

This registers the table in the Databricks catalog — queryable by name from any notebook or the SQL Editor.


A Full End-to-End Example

Let's put it all together. A realistic transformation pipeline:

from pyspark.sql.functions import col, upper, round, when, year, month, sum, count

# 1. Read raw data
raw = spark.read.csv("/mnt/raw/sales.csv", header=True, inferSchema=True)

# 2. Clean and enrich
cleaned = raw \
    .dropDuplicates(["order_id"]) \
    .dropna(subset=["order_id", "amount", "customer_id"]) \
    .withColumn("product", upper(col("product"))) \
    .withColumn("amount", round(col("amount"), 2)) \
    .withColumn("tier",
        when(col("amount") >= 5000, "Premium")
        .when(col("amount") >= 1000, "Standard")
        .otherwise("Basic")
    ) \
    .withColumn("year", year(col("order_date"))) \
    .withColumn("month", month(col("order_date")))

# 3. Aggregate to gold
gold = cleaned \
    .groupBy("region", "year", "month") \
    .agg(
        sum("amount").alias("total_revenue"),
        count("order_id").alias("total_orders")
    )

# 4. Write Silver layer
cleaned.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("/mnt/processed/sales_silver/")

# 5. Write Gold layer
gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.monthly_sales")

print("Pipeline complete.")
Enter fullscreen mode Exit fullscreen mode

This is the skeleton of real data engineering work. In articles 9 and 10, we'll build this into a full Medallion Architecture pipeline.


Wrapping Up

Here's what you've learned in this article:

  • A Spark DataFrame is a distributed table — similar to Pandas but built for scale
  • Reading data: CSV, JSON, Parquet, and Delta all have their place — prefer Parquet/Delta in production
  • Core transformations: select, filter, withColumn, groupBy, join, dropDuplicates, fillna
  • SQL works side by side with PySpark — use whichever makes your code clearer
  • Writing data: always use Delta format, choose your write mode carefully, and partition large tables
  • Use saveAsTable to register tables in the catalog for easy SQL access

In the next article, we go deep on the technology that makes all of this reliable: Delta Lake.

Top comments (0)