¿Qué es Liquid Clustering y por qué es un game changer?


Liquid Clustering es una técnica introducida por Delta Lake diseñada para superar las limitaciones de las estrategias de particionamiento y ordenamiento de datos, como Z-Ordering. Esta técnica se centra en maximizar la eficiencia del almacenamiento y la organización de datos al minimizar la necesidad de configuraciones manuales y tareas recurrentes para reescribir los archivos de datos existentes.
Tanto con particionamiento, Z-Order o Liquid Clustering, lo que buscamos es una mayor velocidad en las consultas y un almacenamiento de datos más eficiente. Sin embargo, la gran diferencia de esta última es la flexibilidad que ofrece. Con Liquid Clustering no es necesario saber cuales van a ser las columnas que los usuarios van a utilizar para filtrar datos y además, al cambiar las claves escogidas, no necesita reescribir los datos de la tabla como en el caso de Z-Order.
Además, Liquid Clustering utiliza un algoritmo dinámico basado en las estadísticas del archivo para optimizar la distribución de los datos y minimizar los problemas causados por desbalanceo de archivos o skew.
Liquid Clustering está disponible para Delta Lake 3.1.0 y versiones superiores. Runtime Version 1.3 de Spark en Microsoft Fabric.
¿Cómo almacena y lee Liquid Clustering los archivos Parquet?
Almacenamiento:
Cuando activas Liquid Clustering, Delta Lake organiza los datos en clústeres utilizando rangos de valores (no carpetas físicas como en las particiones).
Estos clústeres están identificados por estadísticas de archivo (mínimos, máximos, etc…) que se almacenan en el transaction log de Delta.
Lectura:
Durante una consulta, Liquid Clustering utiliza las estadísticas almacenadas para hacer un pruning de archivos eficiente, es decir, lee únicamente los archivos relevantes para los valores solicitados en la consulta.
Esto elimina la necesidad de escanear archivos completos innecesariamente y reduce significativamente el tiempo de respuesta.
¿Cuándo utilizar Liquid Clustering?
Liquid Clustering es especialmente útil cuando:
Se insertan nuevos datos regularmente a la tabla
Existen datos con alta cardinalidad o sin una partición clara
Los patrones de consulta cambian con el tiempo
Se necesita soportar escrituras concurrentes
Beneficios de Liquid Clustering
Eficiencia en consultas:
- Mejora el rendimiento al reducir la cantidad de archivos que deben ser leídos gracias a las estadísticas de rangos.
Manejo de cardinalidad alta:
- Es ideal para datos donde las particiones tradicionales generarían demasiadas carpetas pequeñas, lo que afectaría el rendimiento.
Evolución automática:
- Permite reorganizar los datos con operaciones de optimización, manteniéndolos eficientemente organizados a medida que crecen.
Flexibilidad:
- No necesitas decidir de antemano una estructura de partición, lo que simplifica el diseño inicial.
Evita problemas de skew:
- Rebalancea los datos automáticamente para evitar concentraciones de valores en pocos archivos.
¿Cómo utilizar Liquid Clustering?
Creando una tabla vacía
CREATE TABLE liquid_clustering_table
(
id int,
valor int,
categoria string,
fecha date
)
CLUSTER BY
(
categoria,
fecha
);
Utilizando CTAS
spark.sql("CREATE TABLE liquid_clustering_table CLUSTER BY (categoria, fecha) AS SELECT * FROM base_table")
Modificación de las columnas clusterizadas
En el caso de que se quieran modificar las columnas, se puede hacer con el siguiente comando:
ALTER TABLE liquid_clustering_table CLUSTER BY (categoria)
Cuando se cambian las columnas de clustering, todas las nuevas escrituras de datos y operaciones OPTIMIZE seguirán las nuevas columnas de clustering. Los datos existentes no se reescriben.
Desactivar Liquid Clustering
También se puede desactivar la funcionalidad Liquid Clustering pero esta operación no reescribe los datos que ya han sido clusterizados, sino que evita que los nuevos datos y las operaciones OPTIMIZE utilicen las columnas clusterizadas para organizar los datos.
ALTER TABLE liquid_clustering_table CLUSTER BY NONE
Demo Time!
He realizado una comparativa con diferentes técnicas para visualizar el impacto de cada una de ellas a la hora de consultar los datos. Las comparativas son:
Tabla delta
Tabla delta con particionado
Tabla delta con Z-Order
Tabla delta con Liquid Clustering
Comparativas
Dataset
Los datos que he utilizado son los de Yellow Taxi Trip Data que se pueden obtener desde la siguiente web: Raw Data - TLC
El conjunto de datos es de 443 millones de registros con la siguiente estructura:
Creación de las tablas
# Crear tabla Delta sin partición
df.write.format("delta").mode("overwrite").saveAsTable("nyc_yellow_taxi_trip_data_no_partition")
# Crear tabla Delta particionada por nyc_year
df.write.format("delta").partitionBy("nyc_year").mode("overwrite").saveAsTable("nyc_yellow_taxi_trip_data_partitioned")
# Crear tabla Delta con partición y Z-Order
df.write.format("delta").mode("overwrite").saveAsTable("nyc_yellow_taxi_trip_data_zorder")
spark.sql(f"OPTIMIZE nyc_yellow_taxi_trip_data_zorder ZORDER BY (nyc_year)")
# Crear tabla Delta para Liquid Clustering
spark.sql("CREATE TABLE nyc_yellow_taxi_trip_data_liquid_clustering CLUSTER BY (nyc_year, PULocationID, DOLocationID, passenger_count) AS SELECT * FROM nyc_yellow_taxi_trip_data_no_partition")
spark.sql("OPTIMIZE nyc_yellow_taxi_trip_data_liquid_clustering")
Comprobamos que se ha creado correctamente la tabla
Comparativa 1
from time import time
import matplotlib.pyplot as plt
# Crear una función para medir los tiempos de consulta
def measure_query_time(query):
start_time = time()
spark.sql(query).collect() # Ejecutar la consulta
end_time = time()
return end_time - start_time
# Diccionario para almacenar los tiempos
query_times = {}
# Consultas
queries = {
"simple_query": "SELECT count(1) as nrows FROM nyc_yellow_taxi_trip_data_no_partition WHERE nyc_year = 2018",
"partition_query": "SELECT count(1) as nrows FROM nyc_yellow_taxi_trip_data_partitioned WHERE nyc_year = 2018",
"zorder_query": "SELECT count(1) as nrows FROM nyc_yellow_taxi_trip_data_zorder WHERE nyc_year = 2018",
"liquid_clustering_query": "SELECT count(1) as nrows FROM nyc_yellow_taxi_trip_data_liquid_clustering WHERE nyc_year = 2018",
}
# Ejecutar las consultas y medir tiempos
for query_name, query in queries.items():
query_times[query_name] = measure_query_time(query)
# Mostrar los resultados
for query_name, exec_time in query_times.items():
print(f"{query_name}: {exec_time:.2f} segundos")
# Crear un gráfico comparativo
plt.figure(figsize=(10, 6))
plt.bar(query_times.keys(), query_times.values(), color=['blue', 'orange', 'green', 'red'])
plt.ylabel("Tiempo de ejecución (segundos)")
plt.title("Comparativa de tiempos de consulta")
plt.show()
Comparativa 2
from time import time
import matplotlib.pyplot as plt
# Crear una función para medir los tiempos de consulta
def measure_query_time(query):
start_time = time()
spark.sql(query).collect() # Ejecutar la consulta
end_time = time()
return end_time - start_time
# Diccionario para almacenar los tiempos
query_times = {}
# Consultas
queries = {
"simple_query": "SELECT nyc_year, PULocationID, SUM(passenger_count) as passenger_count FROM nyc_yellow_taxi_trip_data_no_partition GROUP BY nyc_year, PULocationID",
"partition_query": "SELECT nyc_year, PULocationID, SUM(passenger_count) as passenger_count FROM nyc_yellow_taxi_trip_data_partitioned GROUP BY nyc_year, PULocationID",
"zorder_query": "SELECT nyc_year, PULocationID, SUM(passenger_count) as passenger_count FROM nyc_yellow_taxi_trip_data_zorder GROUP BY nyc_year, PULocationID",
"liquid_clustering_query": "SELECT nyc_year, PULocationID, SUM(passenger_count) as passenger_count FROM nyc_yellow_taxi_trip_data_liquid_clustering GROUP BY nyc_year, PULocationID",
}
# Ejecutar las consultas y medir tiempos
for query_name, query in queries.items():
query_times[query_name] = measure_query_time(query)
# Mostrar los resultados
for query_name, exec_time in query_times.items():
print(f"{query_name}: {exec_time:.2f} segundos")
# Crear un gráfico comparativo
plt.figure(figsize=(10, 6))
plt.bar(query_times.keys(), query_times.values(), color=['blue', 'orange', 'green', 'red'])
plt.ylabel("Tiempo de ejecución (segundos)")
plt.title("Comparativa de tiempos de consulta")
plt.show()
Comparativa 3
from time import time
import matplotlib.pyplot as plt
# Crear una función para medir los tiempos de consulta
def measure_query_time(query):
start_time = time()
spark.sql(query).collect() # Ejecutar la consulta
end_time = time()
return end_time - start_time
# Diccionario para almacenar los tiempos
query_times = {}
# Consultas
queries = {
"simple_query": "SELECT COUNT(1) as nrows FROM nyc_yellow_taxi_trip_data_no_partition WHERE passenger_count > 2 AND PULocationID = 264",
"partition_query": "SELECT COUNT(1) as nrows FROM nyc_yellow_taxi_trip_data_partitioned WHERE passenger_count > 2 AND PULocationID = 264",
"zorder_query": "SELECT COUNT(1) as nrows FROM nyc_yellow_taxi_trip_data_zorder WHERE passenger_count > 2 AND PULocationID = 264",
"liquid_clustering_query": "SELECT COUNT(1) as nrows FROM nyc_yellow_taxi_trip_data_liquid_clustering WHERE passenger_count > 2 AND PULocationID = 264",
}
# Ejecutar las consultas y medir tiempos
for query_name, query in queries.items():
query_times[query_name] = measure_query_time(query)
# Mostrar los resultados
for query_name, exec_time in query_times.items():
print(f"{query_name}: {exec_time:.2f} segundos")
# Crear un gráfico comparativo
plt.figure(figsize=(10, 6))
plt.bar(query_times.keys(), query_times.values(), color=['blue', 'orange', 'green', 'red'])
plt.ylabel("Tiempo de ejecución (segundos)")
plt.title("Comparativa de tiempos de consulta")
plt.show()
Comparativa 4
from time import time
import matplotlib.pyplot as plt
# Crear una función para medir los tiempos de consulta
def measure_query_time(query):
start_time = time()
spark.sql(query).collect() # Ejecutar la consulta
end_time = time()
return end_time - start_time
# Diccionario para almacenar los tiempos
query_times = {}
# Consultas
queries = {
"simple_query": "SELECT * FROM nyc_yellow_taxi_trip_data_no_partition WHERE PULocationID = 150 AND DOLocationID = 50 AND passenger_count > 1",
"partition_query": "SELECT * FROM nyc_yellow_taxi_trip_data_partitioned WHERE PULocationID = 150 AND DOLocationID = 50 AND passenger_count > 1",
"zorder_query": "SELECT * FROM nyc_yellow_taxi_trip_data_zorder WHERE PULocationID = 150 AND DOLocationID = 50 AND passenger_count > 1",
"liquid_clustering_query": "SELECT * FROM nyc_yellow_taxi_trip_data_liquid_clustering WHERE PULocationID = 150 AND DOLocationID = 50 AND passenger_count > 1",
}
# Ejecutar las consultas y medir tiempos
for query_name, query in queries.items():
query_times[query_name] = measure_query_time(query)
# Mostrar los resultados
for query_name, exec_time in query_times.items():
print(f"{query_name}: {exec_time:.2f} segundos")
# Crear un gráfico comparativo
plt.figure(figsize=(10, 6))
plt.bar(query_times.keys(), query_times.values(), color=['blue', 'orange', 'green', 'red'])
plt.ylabel("Tiempo de ejecución (segundos)")
plt.title("Comparativa de tiempos de consulta")
plt.show()
Conclusiones
Como se ha podido observar, cuando todas las consultas contienen la columna utilizada por todas las técnicas, en este caso la columna nyc_year, podemos observar que Liquid Clustering está por detrás del particionado y Z-Order, aunque no por una diferencia significativa. Sin embargo, cuando esta columna desaparece de la ecuación de las consultas, cosa que es muy probable que ocurra, Liquid Clustering destaca sobre las demás.
En términos generales, utilizar Liquid Clustering ofrece una mayor eficiencia y flexibilidad en comparación con las otras técnicas, lo que se traduce en mejoras significativas en el rendimiento de las consultas y el mantenimiento de datos. Por lo que, se recomienda utilizar esta técnica para todas las tablas Delta que su tamaño sea inferior a 10TB, que en ese caso es más recomendable utilizar particiones en conjunto con Z-Order.
Subscribe to my newsletter
Read articles from Kilian Baccaro Salinas directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
