What is Caching in Spark?
Caching in Spark means storing computed data in memory so you don't have to recalculate it every time you use it. Think of it like saving your work - once you've done the hard calculation, you keep the result handy for later use.
Cache Usage Patterns Found in This Project
1. Strategic Caching of Transformed Data
What we found:
val sellosDS: Dataset[SelloSource] = SellosSourceTransformer
.procesaUltimosSellosActivosPorFechaPorPatente(sellosDF).cache()
val contratosPMFDS: Dataset[ContratosPMFSource] = ContratosPMFSourceTransformer
.processActiveContractsPMF(contratosPmfDF).cache()
Lesson: Cache datasets right after complex transformations that will be used multiple times. This prevents Spark from recalculating the same expensive operations.
2. Caching Reference Data
What we found:
val empresasDS = EmpresaTransformer.transformEmpresas(sellosDS).cache()
val gerenciasDS = GerenciaTransformer.transformGerencias(sellosDS).cache()
val participantesDS = ParticipantesTransformer.transformParticipants(sellosDS, gerenciasDS).cache()
Lesson: Cache reference data (like company info, departments, participants) because these small datasets get joined with larger datasets multiple times during the ETL process.
3. Cache After Window Operations
What we found in the transformer:
rawData
.filter(F.lower(F.col("ESTADO")) === "activo")
.transform(df => {
val windowSpec = Window.partitionBy(F.col("PATENTE")).orderBy(F.col("FECHA DE ENTREGA").desc)
df.withColumn("row_num", F.row_number().over(windowSpec))
.filter(F.col("row_num") === 1)
.drop("row_num")
})
Lesson: Window operations are expensive. When you do complex window functions (like getting the latest record per group), cache the result if you'll use it again.
Key Problems We Found
1. Missing Cache Cleanup
Problem: The project caches datasets but never calls unpersist()
to free memory.
Impact: Memory keeps growing during ETL execution, which can cause:
- Out of memory errors
- Slower performance
- Resource waste
Better approach:
try {
val cachedDS = someTransformation().cache()
// Use the cached dataset
doMultipleOperations(cachedDS)
} finally {
cachedDS.unpersist() // Clean up memory
}
2. Over-caching Small Operations
Problem: Some cached datasets are only used once or twice.
Rule of thumb: Only cache if you'll use the data 3+ times, or if the computation is very expensive.
3. No Cache Storage Level Management
Problem: Using default cache settings without considering memory vs disk trade-offs.
Better approach:
// For frequently accessed small data
dataset.persist(StorageLevel.MEMORY_ONLY)
// For large data that might not fit in memory
dataset.persist(StorageLevel.MEMORY_AND_DISK)
Best Practices from This Project
✅ Good Practices Found
Cache after complex transformations - The project correctly caches after expensive operations like window functions and joins.
Cache reference data - Small lookup tables (companies, departments) are cached since they're used in multiple joins.
Cache early in the pipeline - Transformed source data is cached immediately after processing.
❌ Issues to Fix
Add cache cleanup - Always unpersist cached data when done.
Check cache necessity - Some caches might be overkill if data is only used twice.
Monitor memory usage - Add logging to track cache hit/miss ratios.
Simple Rules for ETL Caching
When TO Cache:
- After expensive transformations (window functions, complex joins)
- Small reference data used in multiple places
- Data you'll access 3+ times
- Results of file reading when processing multiple outputs
When NOT to Cache:
- Data used only once or twice
- Very large datasets that won't fit in memory
- Simple transformations (like column renaming)
Cache Cleanup Pattern:
val cachedData = expensiveTransformation().cache()
try {
// Use cached data multiple times
result1 = process1(cachedData)
result2 = process2(cachedData)
result3 = process3(cachedData)
} finally {
cachedData.unpersist() // Always clean up!
}
Memory Management:
// Check what's cached
spark.catalog.listTables().show()
// Clear all cache if needed
spark.catalog.clearCache()
Real Example from the Project
The project processes stamps data through multiple steps:
- Raw data → Cache after filtering active stamps
- Companies → Cache because used in joins
- Participants → Cache because used in multiple table writes
- Final stamps → Cache because written to multiple related tables
This pattern makes sense, but adding cleanup would make it perfect:
def processWithCleanup(spark: SparkSession): Unit = {
val sellosDS = SellosSourceTransformer.procesaUltimosSellosActivosPorFechaPorPatente(sellosDF).cache()
val empresasDS = EmpresaTransformer.transformEmpresas(sellosDS).cache()
try {
// Process everything
processEmpresas(empresasDS)
processSellos(sellosDS, empresasDS)
} finally {
// Cleanup
sellosDS.unpersist()
empresasDS.unpersist()
}
}
Remember: Cache smart, clean up always!
Top comments (0)