Delta Table History y Vacuum

Introducción

En este artículo veremos como se puede recuperar información sobre las operaciones, usuario, marca de tiempo, etc. de cada escritura en una tabla Delta ejecutando el comando history. Además, veremos como se pueden eliminar los archivos de datos que ya no hacen referencia a una tabla Delta con el comando VACUUM y las consecuencias de ello.

Preparando el entorno

Partimos de una tabla delta llamada dimcurrency donde se almacena la información referente a las monedas.

from pyspark.sql.functions import desc, asc
df = spark.read.format("delta").load("Tables/dimcurrency")
display(df.orderBy(df.CurrencyKey.desc()))

Creamos un dataframe para insertar la nueva fila a la tabla delta.

Primero obtenemos el valor máximo de la columna CurrencyKey (pk) para sumarle uno a su valor. Luego montamos el dataframe con todas las columnas y valores necesarios.

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Max CurrencyKey
max_currencykey = df.select(max(df.CurrencyKey)+1).first()[0]

# Prepare Data and Schema
data = [(max_currencykey,"VRN","Verne Currency")]

schema = StructType([ \
    StructField("CurrencyKey",IntegerType(),True), \
    StructField("CurrencyAlternateKey",StringType(),True), \
    StructField("CurrencyName",StringType(),True) \
  ])

# Create DataFrame
df2 = spark.createDataFrame(data = data, schema = schema)
df2 = df2.withColumn("load_date",date_format(current_timestamp(),"yyyy-MM-dd HH:mm:ss.S").cast(TimestampType()))
df2.printSchema()
df2.show(truncate=False)

Hacemos un append a la tabla delta para insertar el nuevo registro y comprobamos que se ha insertado

df2.write.format("delta").mode("append").saveAsTable("dimcurrency")

df = spark.read.format("delta").load("Tables/dimcurrency")
df.where(df.CurrencyKey == max_currencykey).show()

Recuperar el historial de una tabla delta

Ahora vemos la historia de la tabla dimcurrency. Podemos ver todas las operaciones que se han hecho, desde su creación, hasta el último insert realizado en este ejemplo

from delta.tables import *
dimcurrency = DeltaTable.forPath(spark, "Tables/dimcurrency")

fullHistoryDF = dimcurrency.history()    # get the full history of the table
display(fullHistoryDF)

Una vez vista la historia y las versiones que tiene, se puede volver a cualquier versión fácilmente.

Volvemos a una versión anterior al último registro insertado.

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "dimcurrency")

df = deltaTable.restoreToVersion(0) # restore table to version 0
display(df)

Si leemos de la tabla el registro insertado en los pasos anteriores, ya no aparece porque hemos vuelto a una versión más antigua de la tabla.

df = spark.read.format("delta").load("Tables/dimcurrency")
df.where(df.CurrencyKey == 106).show()

Vaciado de una tabla delta – VACUUM

Se pueden eliminar los archivos a los que ya no hacen referencia a una tabla Delta y que son más antiguos que el umbral de retención ejecutando el comando VACUUM en la tabla. VACUUM no se activa automáticamente y el umbral de retención predeterminado para los archivos es de 7 días. Para modificar este comportamiento, se debe cambiar la siguiente propiedad de la tabla:

  • delta.deletedFileRetentionDuration = «interval <interval>»: Controla cuánto tiempo hace que un archivo debe haber sido borrado antes de ser candidato a VACUUM. Por defecto, el intervalo es de 7 días.

Para acceder a 30 días de datos históricos aunque ejecute VACUUM en la tabla Delta, configure delta.deletedFileRetentionDuration = «interval 30 days». Esta configuración puede hacer que aumenten los costes de almacenamiento.

Importante

Vacuum elimina todos los archivos de los directorios no gestionados por Delta Lake, ignorando los directorios que comienzan por _.

Vacuum borra sólo los archivos de datos, no los archivos de log. Los archivos de log se eliminan automáticamente y de forma asíncrona después de las operaciones de puntos de control. El periodo de retención predeterminado de los archivos de log es de 30 días, configurable a través de la propiedad delta.logRetentionDuration.

Advertencia

Se recomienda establecer un intervalo de retención de al menos 7 días, ya que las instantáneas antiguas y los archivos no confirmados pueden seguir siendo utilizados por lectores o escritores simultáneos de la tabla. Si VACUUM limpia los archivos activos, los lectores concurrentes pueden fallar o, peor aún, las tablas pueden corromperse cuando VACUUM borre archivos que aún no han sido confirmados.

Una vez clara la teoría, pasamos a la práctica.

Para el ejemplo se va a utilizar la misma tabla delta dimcurrency. Primero, vemos cuantos ficheros existen

Actualmente, existen 2 ficheros parquet para la tabla delta dimcurrency. Realizamos operaciones contra la tabla delta para generar más ficheros

spark.sql("INSERT INTO dimcurrency VALUES (997,'Test','Test Currency',CURRENT_DATE())")
spark.sql("INSERT INTO dimcurrency VALUES (998,'Test2','Test2 Currency',CURRENT_DATE())")
spark.sql("INSERT INTO dimcurrency VALUES (999,'Test3','Test3 Currency',CURRENT_DATE())")
spark.sql("UPDATE dimcurrency SET CurrencyName = 'Blog Vacuum Currency' WHERE CurrencyKey=999")
spark.sql("DELETE FROM dimcurrency WHERE CurrencyKey=106")

Revisamos de nuevo la historia de la tabla y los archivos

from delta.tables import *
dimcurrency = DeltaTable.forPath(spark, "Tables/dimcurrency")
fullHistoryDF = dimcurrency.history()
display(fullHistoryDF)

Antes de ejecutar el VACUUM, podemos ver cuales serán los ficheros que serán borrados con el siguiente comando:

df = spark.sql("VACUUM dimcurrency RETAIN 0 HOURS DRY RUN")
display(df)

Como podemos observar, como se ha especificado un valor muy bajo de retención, aparece un error por haber usado un valor más bajo que 168 horas. Esto ocurre porque hay riesgo de que la tabla delta se corrompa o se quede en un estado corrupto.

No es recomendable hacerlo pero en este ejemplo deshabilitaremos esta comprobación para que se pueda especificar un valor inferior a 168 horas. Para ello, ejecutamos el siguiente código

%%sql
set spark.databricks.delta.retentionDurationCheck.enabled = false

Si volvemos a utilizar la sentencia anterior, ahora si nos mostrará los archivos de datos que se borrarán tras el Vacuum.

Ejecutamos el Vacuum contra la tabla delta

spark.sql("VACUUM dimcurrency RETAIN 0 HOURS")

Al finalizar la ejecución, revisamos de nuevo los archivos de la tabla delta y observamos como los tres archivos de datos se han eliminado

Si vemos de nuevo la historia de la tabla, vemos como se han generado dos versiones por la operación Vacuum realizada

Si se intenta volver a una versión anterior de la tabla delta la cual necesita alguno de los archivos de datos que se han eliminado, recibiremos el siguiente error

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "dimcurrency")
df = deltaTable.restoreToVersion(0)
display(df)

Es importante conocer los riesgos que supone ejecutar el Vacuum porque se puede perder la capacidad de retroceder en el tiempo a una versión anterior al periodo de retención.

0
Subscribe to my newsletter

Read articles from Kilian Baccaro Salinas directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kilian Baccaro Salinas
Kilian Baccaro Salinas