DEV Community

Cover image for Delta Lake de datos
Andres
Andres

Posted on

Delta Lake de datos

Delta Lake es una capa de almacenamiento que se ejecuta sobre un lago de datos existente y proporciona transacciones ACID, manejo de versiones y gestión de datos unificados. Es una solución ideal para manejar datos en grandes volúmenes y mejorar la confiabilidad y el rendimiento de los pipelines de datos.

Instalación de Delta Lake

Para utilizar Delta Lake, necesitas instalar pyspark y delta-spark:

pip install pyspark delta-spark
Enter fullscreen mode Exit fullscreen mode

Ejemplo de Uso de Delta Lake

A continuación, se muestra un ejemplo de cómo crear y manipular una tabla Delta Lake utilizando PySpark.

Paso 1: Configurar SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
Enter fullscreen mode Exit fullscreen mode

Paso 2: Crear una Tabla Delta

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

# Definir el esquema de los datos
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Crear un DataFrame de ejemplo
data = [(1, "Alice", 29), (2, "Bob", 31), (3, "Cathy", 25)]
df = spark.createDataFrame(data, schema)

# Escribir el DataFrame como una tabla Delta
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
Enter fullscreen mode Exit fullscreen mode

Paso 3: Leer y Consultar la Tabla Delta

# Leer la tabla Delta
delta_df = spark.read.format("delta").load("/tmp/delta-table")

# Mostrar los datos
delta_df.show()
Enter fullscreen mode Exit fullscreen mode

Paso 4: Actualizar Datos en la Tabla Delta

# Crear un DataFrame con datos actualizados
data_update = [(1, "Alice", 30), (3, "Cathy", 26)]
df_update = spark.createDataFrame(data_update, schema)

# Convertir la tabla Delta a un objeto DeltaTable
delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")

# Realizar una operación de upsert (merge)
delta_table.alias("tgt").merge(
    df_update.alias("src"),
    "tgt.id = src.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Enter fullscreen mode Exit fullscreen mode

Paso 5: Eliminar Datos en la Tabla Delta

# Eliminar registros donde la edad es mayor a 30
delta_table.delete("age > 30")
Enter fullscreen mode Exit fullscreen mode

Paso 6: Realizar Time Travel

# Leer la tabla Delta en una versión anterior
old_df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
old_df.show()
Enter fullscreen mode Exit fullscreen mode

Conclusión

Delta Lake proporciona una solución robusta para el manejo de datos en un lago de datos, ofreciendo transacciones ACID, manejo de versiones y operaciones de lectura/escritura eficientes. Con Delta Lake, puedes mejorar la confiabilidad y el rendimiento de tus pipelines de datos.

Top comments (0)