DEV Community

Cover image for Spark & Scala Cache Lessons from ETL Project
krlz
krlz

Posted on

Spark & Scala Cache Lessons from ETL Project

What is Caching in Spark?

Caching in Spark means storing computed data in memory so you don't have to recalculate it every time you use it. Think of it like saving your work - once you've done the hard calculation, you keep the result handy for later use.

Cache Usage Patterns Found in This Project

1. Strategic Caching of Transformed Data

What we found:

val sellosDS: Dataset[SelloSource] = SellosSourceTransformer
  .procesaUltimosSellosActivosPorFechaPorPatente(sellosDF).cache()

val contratosPMFDS: Dataset[ContratosPMFSource] = ContratosPMFSourceTransformer
  .processActiveContractsPMF(contratosPmfDF).cache()
Enter fullscreen mode Exit fullscreen mode

Lesson: Cache datasets right after complex transformations that will be used multiple times. This prevents Spark from recalculating the same expensive operations.

2. Caching Reference Data

What we found:

val empresasDS = EmpresaTransformer.transformEmpresas(sellosDS).cache()
val gerenciasDS = GerenciaTransformer.transformGerencias(sellosDS).cache()  
val participantesDS = ParticipantesTransformer.transformParticipants(sellosDS, gerenciasDS).cache()
Enter fullscreen mode Exit fullscreen mode

Lesson: Cache reference data (like company info, departments, participants) because these small datasets get joined with larger datasets multiple times during the ETL process.

3. Cache After Window Operations

What we found in the transformer:

rawData
  .filter(F.lower(F.col("ESTADO")) === "activo")
  .transform(df => {
    val windowSpec = Window.partitionBy(F.col("PATENTE")).orderBy(F.col("FECHA DE ENTREGA").desc)
    df.withColumn("row_num", F.row_number().over(windowSpec))
      .filter(F.col("row_num") === 1)
      .drop("row_num")
  })
Enter fullscreen mode Exit fullscreen mode

Lesson: Window operations are expensive. When you do complex window functions (like getting the latest record per group), cache the result if you'll use it again.

Key Problems We Found

1. Missing Cache Cleanup

Problem: The project caches datasets but never calls unpersist() to free memory.

Impact: Memory keeps growing during ETL execution, which can cause:

  • Out of memory errors
  • Slower performance
  • Resource waste

Better approach:

try {
  val cachedDS = someTransformation().cache()
  // Use the cached dataset
  doMultipleOperations(cachedDS)
} finally {
  cachedDS.unpersist() // Clean up memory
}
Enter fullscreen mode Exit fullscreen mode

2. Over-caching Small Operations

Problem: Some cached datasets are only used once or twice.

Rule of thumb: Only cache if you'll use the data 3+ times, or if the computation is very expensive.

3. No Cache Storage Level Management

Problem: Using default cache settings without considering memory vs disk trade-offs.

Better approach:

// For frequently accessed small data
dataset.persist(StorageLevel.MEMORY_ONLY)

// For large data that might not fit in memory  
dataset.persist(StorageLevel.MEMORY_AND_DISK)
Enter fullscreen mode Exit fullscreen mode

Best Practices from This Project

✅ Good Practices Found

  1. Cache after complex transformations - The project correctly caches after expensive operations like window functions and joins.

  2. Cache reference data - Small lookup tables (companies, departments) are cached since they're used in multiple joins.

  3. Cache early in the pipeline - Transformed source data is cached immediately after processing.

❌ Issues to Fix

  1. Add cache cleanup - Always unpersist cached data when done.

  2. Check cache necessity - Some caches might be overkill if data is only used twice.

  3. Monitor memory usage - Add logging to track cache hit/miss ratios.

Simple Rules for ETL Caching

When TO Cache:

  • After expensive transformations (window functions, complex joins)
  • Small reference data used in multiple places
  • Data you'll access 3+ times
  • Results of file reading when processing multiple outputs

When NOT to Cache:

  • Data used only once or twice
  • Very large datasets that won't fit in memory
  • Simple transformations (like column renaming)

Cache Cleanup Pattern:

val cachedData = expensiveTransformation().cache()
try {
  // Use cached data multiple times
  result1 = process1(cachedData)
  result2 = process2(cachedData)
  result3 = process3(cachedData)
} finally {
  cachedData.unpersist() // Always clean up!
}
Enter fullscreen mode Exit fullscreen mode

Memory Management:

// Check what's cached
spark.catalog.listTables().show()

// Clear all cache if needed  
spark.catalog.clearCache()
Enter fullscreen mode Exit fullscreen mode

Real Example from the Project

The project processes stamps data through multiple steps:

  1. Raw data → Cache after filtering active stamps
  2. Companies → Cache because used in joins
  3. Participants → Cache because used in multiple table writes
  4. Final stamps → Cache because written to multiple related tables

This pattern makes sense, but adding cleanup would make it perfect:

def processWithCleanup(spark: SparkSession): Unit = {
  val sellosDS = SellosSourceTransformer.procesaUltimosSellosActivosPorFechaPorPatente(sellosDF).cache()
  val empresasDS = EmpresaTransformer.transformEmpresas(sellosDS).cache()

  try {
    // Process everything
    processEmpresas(empresasDS)
    processSellos(sellosDS, empresasDS)
  } finally {
    // Cleanup
    sellosDS.unpersist()
    empresasDS.unpersist()
  }
}
Enter fullscreen mode Exit fullscreen mode

Remember: Cache smart, clean up always!

Top comments (0)