Delta Lake es una capa de almacenamiento que se ejecuta sobre un lago de datos existente y proporciona transacciones ACID, manejo de versiones y gestión de datos unificados. Es una solución ideal para manejar datos en grandes volúmenes y mejorar la confiabilidad y el rendimiento de los pipelines de datos.
Instalación de Delta Lake
Para utilizar Delta Lake, necesitas instalar pyspark
y delta-spark
:
pip install pyspark delta-spark
Ejemplo de Uso de Delta Lake
A continuación, se muestra un ejemplo de cómo crear y manipular una tabla Delta Lake utilizando PySpark.
Paso 1: Configurar SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
Paso 2: Crear una Tabla Delta
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
# Definir el esquema de los datos
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# Crear un DataFrame de ejemplo
data = [(1, "Alice", 29), (2, "Bob", 31), (3, "Cathy", 25)]
df = spark.createDataFrame(data, schema)
# Escribir el DataFrame como una tabla Delta
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
Paso 3: Leer y Consultar la Tabla Delta
# Leer la tabla Delta
delta_df = spark.read.format("delta").load("/tmp/delta-table")
# Mostrar los datos
delta_df.show()
Paso 4: Actualizar Datos en la Tabla Delta
# Crear un DataFrame con datos actualizados
data_update = [(1, "Alice", 30), (3, "Cathy", 26)]
df_update = spark.createDataFrame(data_update, schema)
# Convertir la tabla Delta a un objeto DeltaTable
delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")
# Realizar una operación de upsert (merge)
delta_table.alias("tgt").merge(
df_update.alias("src"),
"tgt.id = src.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Paso 5: Eliminar Datos en la Tabla Delta
# Eliminar registros donde la edad es mayor a 30
delta_table.delete("age > 30")
Paso 6: Realizar Time Travel
# Leer la tabla Delta en una versión anterior
old_df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
old_df.show()
Conclusión
Delta Lake proporciona una solución robusta para el manejo de datos en un lago de datos, ofreciendo transacciones ACID, manejo de versiones y operaciones de lectura/escritura eficientes. Con Delta Lake, puedes mejorar la confiabilidad y el rendimiento de tus pipelines de datos.
Top comments (0)