DEV Community

Manaswini Katari
Manaswini Katari

Posted on

The Day My PySpark DataFrame Changed Its Mind

A Short Story About Lazy Evaluation in Databricks

I was building a small ingestion pipeline in Databricks using PySpark.

The requirement was straightforward:

  1. Read incoming customer data from a staging table
  2. MERGE it into a target table (update a few columns for existing customers)
  3. INSERT brand-new customers afterward

It's a SCD Type2 Table. Pretty standard ETL stuff.

My pipeline looked roughly like this:

incoming_df = spark.table("staging_customers")

Then I reused this same incoming_df for two steps.

Step 1 — MERGE (update existing customers)

MERGE INTO customers tgt
USING staging_customers src
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN
UPDATE SET
tgt.email = src.email,
tgt.phone = src.phone

This updates only a few columns for customers that already exist.

So far, everything looked fine.

Step 2 — INSERT new customers using the same DataFrame

new_customers = incoming_df.join(
spark.table("customers"),
"customer_id",
"left_anti"
)

new_customers.write.mode("append").saveAsTable("customers")

The idea here is simple:
• Take all incoming rows
• Remove customers that already exist
• Insert only the truly new ones

But here’s what happened.

👉 Some rows that should have been inserted never showed up.

No errors.
No warnings.
Just missing data.

My Assumption (and the Bug)

In my head, the flow was:
• incoming_df = original staging data
• MERGE updates existing customers
• INSERT uses the same original incoming_df

So new customers should still be inserted.

Right?

Boom.. Wrong! 💥

The Reality: incoming_df Was Never Loaded

This is the part most of us forget.

In Apache Spark, DataFrames are lazy.

When I wrote:

incoming_df = spark.table("staging_customers")

Spark did not read the table.

It only stored a plan:

“When needed, read staging_customers.”

No data moved.
No rows loaded.

Just instructions.

What Actually Happened

Let’s replay this in real time:

1️⃣ I defined incoming_df

Spark saved a query plan.

2️⃣ I ran the MERGE

The customers table changed.
Some rows were updated.

3️⃣ I reused incoming_df for INSERT

This is where the first real action happened.

Spark finally executed:

spark.table("staging_customers")

But by now…

The customers table had already been modified.

So when I ran:

incoming_df LEFT ANTI JOIN customers

Spark compared against the updated table.

Rows that were supposed to be “new” now looked like “existing”.

Result?
• They were filtered out
• Old data never got inserted

Same DataFrame variable.
Completely different logical outcome.

Why This Happens

Because:
• DataFrames don’t store rows
• They store execution plans
• Spark evaluates them only at action time
• Table mutations affect downstream logic

This is lazy evaluation — and it’s working exactly as designed.

The Fix That Actually Worked for Me

The obvious solution is usually:

incoming_df = spark.table("staging_customers").cache()
incoming_df.count()

In theory, this should freeze the DataFrame.

In practice, it didn’t work reliably in my pipeline.

Between cluster behavior, memory pressure, and job boundaries, the cached DataFrame wasn’t always reused. The INSERT step still behaved as if it was re-reading fresh data.

So I went with the approach that always works:

👉 Create a physical temporary table.

Not a cached DataFrame.
Not an in-memory trick.

A real table.

✅ The Reliable Approach: Materialize to a Temporary Table

Instead of keeping everything lazy, I explicitly wrote the staging data to a snapshot table:

spark.table("staging_customers") \
.write.mode("overwrite") \
.saveAsTable("tmp_staging_snapshot")

Now I had a true, physical copy.

Then I used this snapshot everywhere.

MERGE (update existing rows)

MERGE INTO customers tgt
USING tmp_staging_snapshot src
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN
UPDATE SET
tgt.email = src.email,
tgt.phone = src.phone

INSERT (new rows)

snapshot_df = spark.table("tmp_staging_snapshot")

new_customers = snapshot_df.join(
spark.table("customers"),
"customer_id",
"left_anti"
)

new_customers.write.mode("append").saveAsTable("customers")

Because tmp_staging_snapshot is a physical table, Spark can’t lazily reinterpret it.

Both MERGE and INSERT now operate on the exact same frozen data.

No disappearing rows.
No surprises.

Why This Works Better Than cache()

In Spark, cache() is an optimization hint — not a guarantee.

But writing to a table is different:
• Data is physically persisted
• Query plans must read that stored snapshot
• Downstream table changes don’t affect it

Especially in Databricks pipelines, this pattern is far more reliable.

If your tables are Delta (powered by
Delta Lake), you also gain:
• Reproducibility
• Debuggable snapshots
• Optional cleanup afterward

Final Takeaway

A Spark DataFrame is not a snapshot.

It’s a promise to compute later.

So when you:
• Read a table
• Modify that table
• Reuse the same DataFrame

You’re not working with “old data”.

You’re re-reading the table.

One sentence to remember:

In Spark, your DataFrame doesn’t remember yesterday.

And when correctness matters more than cleverness:

Write once. Read many.

Materialize your data temporarily — don’t negotiate with laziness.

Top comments (0)