DataFrames & SQL in Databricks: Reading, Writing, and Transforming Data
This is where things get real.
So far we've set up our environment, understood clusters, learned how Spark works under the hood, and connected to cloud storage. Now we actually touch data.
In this article you'll learn how to read data in multiple formats, transform it using PySpark and SQL, and write results back to storage. These are the operations you'll use every single day as a data engineer.
What is a DataFrame in Spark?
A DataFrame is Spark's primary data structure — a distributed table with named columns and a defined schema.
If you've used Pandas, the concept is familiar. But there are critical differences:
| Pandas DataFrame | Spark DataFrame | |
|---|---|---|
| Where data lives | Single machine RAM | Distributed across cluster |
| Data size limit | Your machine's RAM | Practically unlimited |
| Execution | Immediate | Lazy (runs on action) |
| API | pandas |
pyspark.sql |
| Best for | < 1GB, local work | GBs to PBs, distributed work |
In Databricks, every DataFrame is backed by Spark. When you call spark.read..., you get a Spark DataFrame — not a Pandas one.
💡 You can convert between them:
pandas_df = spark_df.toPandas()— but be careful. This pulls all data to the driver node. Only do this on small datasets.
Reading Data
Reading CSV
# Basic read
df = spark.read.csv("/mnt/raw/sales.csv", header=True, inferSchema=True)
# Better: explicit schema (faster, safer)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("product", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("order_date", DateType(), True),
StructField("region", StringType(), True)
])
df = spark.read.csv("/mnt/raw/sales.csv", schema=schema, header=True)
df.printSchema()
df.show(5)
Reading JSON
df = spark.read.json("/mnt/raw/events/")
# For nested JSON, Spark auto-infers the structure
df.printSchema()
# Access nested fields using dot notation
df.select("user.id", "user.email", "event_type").show()
Reading Parquet
# Parquet preserves schema — no need to define it
df = spark.read.parquet("/mnt/processed/transactions/")
# Read a partitioned dataset — Spark reads only relevant partitions
df = spark.read.parquet("/mnt/processed/sales/") \
.filter("year = 2024 AND month = 1")
Reading Delta Tables
# By path
df = spark.read.format("delta").load("/mnt/processed/sales/")
# By table name (if registered in the catalog)
df = spark.read.table("sales_silver")
Common DataFrame Transformations
select — Choose Columns
# Select specific columns
df.select("order_id", "product", "amount").show()
# Rename on the fly
from pyspark.sql.functions import col
df.select(
col("order_id").alias("id"),
col("amount").alias("revenue"),
col("order_date").alias("date")
).show()
filter / where — Filter Rows
# Filter with column expression
df.filter(col("amount") > 1000).show()
# Filter with SQL string (often cleaner)
df.filter("amount > 1000 AND region = 'North'").show()
# .where() is an alias for .filter()
df.where("region = 'South'").show()
# Multiple conditions
df.filter((col("amount") > 500) & (col("region").isin(["North", "South"]))).show()
withColumn — Add or Modify Columns
from pyspark.sql.functions import col, upper, round, when, lit
# Add a new column
df = df.withColumn("amount_eur", col("amount") * 0.92)
# Modify an existing column
df = df.withColumn("product", upper(col("product")))
# Round a numeric column
df = df.withColumn("amount", round(col("amount"), 2))
# Conditional column (like IF/CASE)
df = df.withColumn("tier",
when(col("amount") >= 5000, "Premium")
.when(col("amount") >= 1000, "Standard")
.otherwise("Basic")
)
# Add a constant column
df = df.withColumn("currency", lit("USD"))
groupBy + agg — Aggregate Data
from pyspark.sql.functions import sum, avg, count, max, min, countDistinct
# Total revenue by region
df.groupBy("region") \
.agg(sum("amount").alias("total_revenue")) \
.show()
# Multiple aggregations at once
df.groupBy("region", "product") \
.agg(
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
count("order_id").alias("total_orders"),
countDistinct("customer_id").alias("unique_customers")
) \
.orderBy("total_revenue", ascending=False) \
.show()
join — Combine DataFrames
# Load a second DataFrame
customers = spark.read.table("customers_silver")
# Inner join (default)
result = df.join(customers, on="customer_id", how="inner")
# Left join
result = df.join(customers, on="customer_id", how="left")
# Join on multiple columns
result = df.join(
customers,
on=["customer_id", "region"],
how="inner"
)
# Join on different column names
result = df.join(
customers,
df.customer_id == customers.id,
how="inner"
).drop(customers.id) # drop duplicate column
⚠️ Joins are expensive in Spark — they trigger a shuffle (data redistribution across workers). Avoid joining unnecessarily, and filter before joining when possible.
dropDuplicates — Remove Duplicate Rows
# Drop fully duplicate rows
df = df.dropDuplicates()
# Drop duplicates based on specific columns (keep first occurrence)
df = df.dropDuplicates(["order_id"])
dropna / fillna — Handle Nulls
# Drop rows with any null value
df = df.dropna()
# Drop rows where specific columns are null
df = df.dropna(subset=["order_id", "amount"])
# Fill nulls with a default value
df = df.fillna({"amount": 0.0, "region": "Unknown"})
SQL in Databricks
Everything you can do with PySpark DataFrames, you can also do with SQL. In Databricks, SQL is a first-class citizen.
Creating a Temp View
To run SQL against a DataFrame, register it as a temporary view:
df.createOrReplaceTempView("sales")
# Now query it with SQL
result = spark.sql("""
SELECT
region,
product,
SUM(amount) AS total_revenue,
COUNT(order_id) AS total_orders,
AVG(amount) AS avg_order_value
FROM sales
WHERE order_date >= '2024-01-01'
GROUP BY region, product
ORDER BY total_revenue DESC
""")
result.show()
Using the %sql Magic Command
In notebooks, you can switch an entire cell to SQL mode:
%sql
SELECT
region,
DATE_TRUNC('month', order_date) AS month,
SUM(amount) AS monthly_revenue
FROM sales
WHERE year(order_date) = 2024
GROUP BY region, month
ORDER BY month, monthly_revenue DESC
The result renders automatically as an interactive table — you can even switch to a chart view directly in the notebook.
Creating Permanent Tables
Temp views disappear when the session ends. For persistent tables, use CREATE TABLE or save as Delta:
%sql
CREATE TABLE IF NOT EXISTS gold.monthly_revenue
USING DELTA
AS
SELECT
region,
DATE_TRUNC('month', order_date) AS month,
SUM(amount) AS total_revenue
FROM sales_silver
GROUP BY region, month
PySpark vs SQL: When to Use Which
Both PySpark and SQL produce the same result in Databricks — Spark executes them through the same engine. So how do you choose?
| Situation | Recommendation |
|---|---|
| Simple aggregations and filters | SQL — more readable |
| Complex multi-step transformations | PySpark — easier to chain and debug |
Conditional logic (when/otherwise) |
PySpark — cleaner than nested CASE WHEN |
| Window functions | Either — SQL is often cleaner |
| Building reusable pipeline functions | PySpark — easier to parameterize |
| Ad-hoc exploration | SQL — fast to write and results display nicely |
| Working with nested/complex types | PySpark — more flexible |
In practice, experienced data engineers mix both freely. Use whatever makes the code clearest.
Writing Data Back to Storage
After transforming your data, you need to write it somewhere.
Write as Parquet
df.write \
.mode("overwrite") \
.parquet("/mnt/processed/sales/")
Write as Delta (Recommended)
df.write \
.format("delta") \
.mode("overwrite") \
.save("/mnt/processed/sales/")
Write Modes
| Mode | Behavior |
|---|---|
overwrite |
Replace existing data entirely |
append |
Add new data to existing data |
ignore |
Do nothing if data already exists |
error (default) |
Raise an error if data already exists |
Write with Partitioning
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save("/mnt/processed/sales/")
Save as a Table
df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("sales_silver")
This registers the table in the Databricks catalog — queryable by name from any notebook or the SQL Editor.
A Full End-to-End Example
Let's put it all together. A realistic transformation pipeline:
from pyspark.sql.functions import col, upper, round, when, year, month, sum, count
# 1. Read raw data
raw = spark.read.csv("/mnt/raw/sales.csv", header=True, inferSchema=True)
# 2. Clean and enrich
cleaned = raw \
.dropDuplicates(["order_id"]) \
.dropna(subset=["order_id", "amount", "customer_id"]) \
.withColumn("product", upper(col("product"))) \
.withColumn("amount", round(col("amount"), 2)) \
.withColumn("tier",
when(col("amount") >= 5000, "Premium")
.when(col("amount") >= 1000, "Standard")
.otherwise("Basic")
) \
.withColumn("year", year(col("order_date"))) \
.withColumn("month", month(col("order_date")))
# 3. Aggregate to gold
gold = cleaned \
.groupBy("region", "year", "month") \
.agg(
sum("amount").alias("total_revenue"),
count("order_id").alias("total_orders")
)
# 4. Write Silver layer
cleaned.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save("/mnt/processed/sales_silver/")
# 5. Write Gold layer
gold.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.monthly_sales")
print("Pipeline complete.")
This is the skeleton of real data engineering work. In articles 9 and 10, we'll build this into a full Medallion Architecture pipeline.
Wrapping Up
Here's what you've learned in this article:
- A Spark DataFrame is a distributed table — similar to Pandas but built for scale
- Reading data: CSV, JSON, Parquet, and Delta all have their place — prefer Parquet/Delta in production
- Core transformations:
select,filter,withColumn,groupBy,join,dropDuplicates,fillna - SQL works side by side with PySpark — use whichever makes your code clearer
- Writing data: always use Delta format, choose your write mode carefully, and partition large tables
- Use
saveAsTableto register tables in the catalog for easy SQL access
In the next article, we go deep on the technology that makes all of this reliable: Delta Lake.
Top comments (0)