¿Cuántos datos están siendo almacenados en mi OneLake?

Si estás metido en el mundo de Microsoft Fabric, seguro que ya conoces OneLake. Piensa en él como el "OneDrive de tus datos": un sitio único y para todos donde guardar la información de la empresa. Suena genial, ¿verdad? Centralizarlo todo facilita un montón la vida para organizar, compartir y no tener datos repetidos por todas partes. Pero claro, a medida que metes más y más proyectos y datos, te empiezas a preguntar: ¿Cuánto espacio estoy usando realmente y qué es lo que más pesa?

Y ojo, que responder a eso tiene su miga. Con un montón de workspaces, cada uno con sus cosas (Lakehouses, Warehouses, Modelos Semánticos, Bases de Datos KQL...), saber exactamente qué ocupa cada cosa es un pequeño lío.

Para abordar este desafío, he desarrollado un notebook de PySpark dentro de Microsoft Fabric que nos permite automatizar este proceso, ofreciendo una visión detallada del uso del almacenamiento en OneLake.

El desafío: Ver claro entre tanta cosa

OneLake es la casa de muchos tipos de artefactos de Fabric. Algunos, como las Lakehouses, son como carpetas y archivos que puedes ver y tocar fácilmente. Pero otros, como los Modelos Semánticos o los Warehouses, son un poco más "caja negra"; su tamaño no es solo sumar archivos, sino que Fabric los maneja a su manera.

Ir mirando esto a mano, workspace por workspace, es una locura, y más si tu empresa tiene unos cuantos.

La solución: Un notebook de PySpark al rescate

El notebook aprovecha el poder de PySpark en el entorno de Microsoft Fabric y las capacidades de la librería sempy para interactuar programáticamente con los workspaces y artefactos. El proceso general que sigue el notebook es el siguiente:

  • Listar Workspaces: Utiliza sempy.fabric.list_workspaces() para obtener una lista de todos los workspaces a los que el usuario tiene acceso.

  • Listar artefactos por workspace: Para cada workspace, emplea sempy.fabric.list_items() para inventariar todos los artefactos.

  • Determinar el tipo de artefacto: Identifica el tipo de cada artefacto (Lakehouse, Warehouse, SemanticModel, KQLDatabase, etc.).

  • Calcular el tamaño:

    • Modelos Semánticos: Intenta obtener el tamaño utilizando la función sempy_labs.get_semantic_model_size(). Es importante destacar que el usuario debe disponer de permisos para ejecutar esta función. Si esta función falla o no devuelve un tamaño, se registra un marcador.

    • Artefactos basados en archivos (Lakehouse, Warehouse, etc.): Para artefactos cuyo almacenamiento es directamente accesible como archivos y carpetas en OneLake (a través de rutas ABFSS), el notebook implementa una función recursiva (get_file_details_recursive). Esta función utiliza notebookutils.fs.ls() para navegar por la estructura de directorios del artefacto y sumar el tamaño de cada archivo individual.

    • Otros tipos de artefactos: Para tipos de artefactos donde el tamaño no se obtiene directamente de los archivos ABFSS (por ejemplo, Report, Notebook, DataPipeline), el script actualmente los omite o permite añadir un marcador de tamaño cero, ya que su "tamaño" en OneLake suele ser despreciable en comparación con los artefactos de datos.

  • Almacenar en una tabla delta: Finalmente, el DataFrame se guarda en una tabla Delta dentro de un Lakehouse.

import sempy_labs as labs
import sempy.fabric as fabric
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import col

def get_file_details_recursive(folder_path):
    """
    Escanea recursivamente una carpeta en OneLake y devuelve una lista con detalles
    (ruta completa ABFSS, tamaño en bytes) de cada archivo encontrado.

    Utiliza mssparkutils.fs.ls para listar el contenido.

    Args:
        folder_path (str): La ruta ABFSS de la carpeta a escanear.
                           Ej: "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<item_id>/Files/"

    Returns:
        list: Una lista de diccionarios [{'path': str, 'size_bytes': int}],
              o una lista vacía si ocurre un error al listar la carpeta
              o si la carpeta está vacía o no contiene archivos directamente.
              Los errores en subcarpetas se registran pero no detienen el escaneo general.
    """

    file_details_list = []
    try:
        # print(f"DEBUG: Escaneando carpeta: {folder_path}") # Descomentar para depuración detallada
        items = notebookutils.fs.ls(folder_path)

        for item in items:
            # Asegurarse de que la ruta del item es completa (ABFSS)
            item_path_full = item.path
            if not item_path_full.startswith("abfss://"):
                 # Si la ruta no es completa, intentar reconstruirla (puede no ser siempre necesario/correcto)
                 # Esto es una suposición basada en cómo a veces se devuelven las rutas
                 if folder_path.endswith('/'):
                     item_path_full = folder_path + item.name
                 else:
                     item_path_full = folder_path + '/' + item.name
                 # print(f"DEBUG: Ruta reconstruida: {item_path_full}") # Descomentar para depuración

            if not item.isDir:
                # Es un archivo, añadir sus detalles
                file_details_list.append({'path': item_path_full, 'size_bytes': item.size})
            else:
                # Es un directorio, llamar recursivamente si no es la misma carpeta (evitar bucles)
                # Comprobamos la ruta completa normalizada para evitar errores por barras finales
                current_folder_normalized = folder_path.rstrip('/')
                item_folder_normalized = item_path_full.rstrip('/')

                if item_folder_normalized != current_folder_normalized:
                    # print(f"DEBUG: Entrando recursivamente en: {item_path_full}") # Descomentar para depuración
                    sub_dir_files = get_file_details_recursive(item_path_full)
                    if sub_dir_files:
                        file_details_list.extend(sub_dir_files)

    except Exception as e:
        # Imprime el error específico de esta carpeta pero permite que el proceso general continúe.
        # No se añadirán archivos de esta ruta específica si falla el 'ls'.
        #print(f"WARN: Error al escanear la carpeta '{folder_path}': {e}. Omitiendo esta ruta.")
        print(f"WARN: Error al escanear la carpeta '{folder_path}'. Omitiendo esta ruta.")
        return [] # Devuelve lista vacía en caso de error en esta carpeta específica

    return file_details_list

# Lista para almacenar los datos de todos los archivos encontrados
all_files_data = []

# Contador para mostrar progreso
processed_workspaces = 0
total_workspaces = 0

print("Iniciando escaneo de workspaces...")

try:
    # Obtener todos los workspaces accesibles
    workspaces_pd = fabric.list_workspaces()
    total_workspaces = len(workspaces_pd)
    print(f"Se encontraron {total_workspaces} áreas de trabajo accesibles.")

    # Iterar sobre cada workspace encontrado
    for ws_index, ws_row in workspaces_pd.iterrows():
        workspace_name = ws_row['Name']
        workspace_id = ws_row['Id']
        processed_workspaces += 1
        print(f"\n[{processed_workspaces}/{total_workspaces}] 📂 Procesando Workspace: '{workspace_name}' (ID: {workspace_id})")

        try:
            # Listar todos los artefactos (items) dentro del workspace actual
            items_pd = fabric.list_items(workspace=workspace_id)
            print(f"  -> Se encontraron {len(items_pd)} artefactos escaneables en '{workspace_name}'.")

            # Iterar sobre cada artefacto del workspace
            for item_index, item_row in items_pd.iterrows():
                artifact_type = item_row['Type']
                artifact_name = item_row['Display Name']
                artifact_id = item_row['Id']

                # Construir la ruta raíz ABFSS para el artefacto
                # Nota: La estructura interna puede variar. Generalmente '/Files' o '/Tables' son puntos de entrada comunes.
                #       Probamos escanear desde la raíz del artefacto.
                artifact_root_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{artifact_id}"

                print(f"    -> 🔍 Escaneando artefacto: '{artifact_name}' (Tipo: {artifact_type}, ID: {artifact_id})")
                # print(f"       Ruta base: {artifact_root_path}") # Descomentar para depuración

                # Inicializar la lista de archivos para ESTE artefacto en CADA iteración
                files_in_artifact = []

                # Variable para almacenar el tamaño si se obtiene de forma especial (p.ej. Semantic Model)
                special_size_bytes = -1 # Usar -1 como indicador inicial (no determinado/error)

                try:
                    # Obtener la lista de archivos y sus tamaños recursivamente
                    if artifact_type == 'SemanticModel':
                        print(f"      -> Intentando obtener tamaño para Semantic Model '{artifact_name}'...")
                        size = labs.get_semantic_model_size(artifact_id, workspace_id)
                        if size is not None:
                            special_size_bytes = int(size)

                        print(f"      => Tamaño obtenido de sempy_labs: {special_size_bytes} bytes.")

                        all_files_data.append({
                                "WorkspaceID": workspace_id, 
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type, 
                                "ArtifactName": artifact_name, 
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path,
                                "FilePath": f"{artifact_root_path}",
                                "SizeBytes": special_size_bytes,
                                "SizeMB": float(special_size_bytes / (1024 * 1024)) if special_size_bytes > 0 else 0.0
                        })
                    else:
                        files_in_artifact = get_file_details_recursive(artifact_root_path + "/") # Añadir '/' por si acaso es necesario

                    if files_in_artifact and artifact_type != 'SemanticModel':
                        print(f"      => Se encontraron {len(files_in_artifact)} archivos para '{artifact_name}'.")
                        # Procesar y añadir la información de cada archivo a la lista global
                        for file_detail in files_in_artifact:
                            file_size_bytes = int(file_detail['size_bytes'])
                            # Evitar división por cero si el tamaño es 0
                            file_size_mb = float(file_size_bytes / (1024 * 1024)) if file_size_bytes > 0 else 0.0

                            all_files_data.append({
                                "WorkspaceID": workspace_id,
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type,
                                "ArtifactName": artifact_name,
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path, # Ruta base desde donde se escaneó
                                "FilePath": file_detail['path'],       # Ruta completa del archivo
                                "SizeBytes": file_size_bytes,
                                "SizeMB": file_size_mb
                            })
                    else:
                        # Si no se encontraron archivos (puede ser normal para ciertos tipos o si está vacío)
                        print(f"      => No se encontraron archivos accesibles vía ABFSS para '{artifact_name}' o está vacío.")

                except Exception as scan_error:
                    # Captura errores durante el escaneo de un artefacto específico
                    print(f"      => ERROR escaneando el artefacto '{artifact_name}' en '{artifact_root_path}'.")
                    #print(f"      => ERROR escaneando el artefacto '{artifact_name}' en '{artifact_root_path}'. Error: {scan_error}")
                    # Opcional: Añadir un registro con error o tamaño -1 para indicar fallo
                    all_files_data.append({
                                "WorkspaceID": workspace_id,
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type,
                                "ArtifactName": artifact_name,
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path,
                                "FilePath": "ERROR_SCANNING_ARTIFACT",
                                "SizeBytes": -1, # Indicador de error
                                "SizeMB": -1.0
                            })

        except Exception as item_error:
            # Captura errores al listar los artefactos de un workspace
            print(f"  -> ERROR al listar artefactos para el workspace '{workspace_name}': {item_error}")
            continue # Continuar con el siguiente workspace

except Exception as ws_error:
    # Captura errores al listar los workspaces
    print(f"FATAL: Error crítico al obtener la lista de workspaces: {ws_error}")

print(f"\n📊 Escaneo completado. Se recopilaron {len(all_files_data)} registros de archivos.")

Notebook completo: GitHub

Beneficios

Esta tabla es un recurso valioso para:

  • Gobernanza del almacenamiento: Identificar qué workspaces, artefactos o incluso qué rutas específicas dentro de un Lakehouse están consumiendo más espacio.

  • Mantenimiento y limpieza: Detectar datos obsoletos, innecesariamente grandes o duplicados que podrían ser archivados o eliminados.

  • Informes y dashboards: Conectar Power BI directamente a esta tabla Delta para crear visualizaciones interactivas del uso del almacenamiento.

  • Automatización: Al ser un notebook, puede programarse su ejecución periódica para tener un seguimiento continuo del crecimiento del almacenamiento.

Consideraciones y Limitaciones

Es importante tener en cuenta algunos aspectos:

  • La función get_semantic_model_size puede no devolver el tamaño si el usuario no tiene los permisos suficientes.

  • Tamaño lógico vs. físico: Para artefactos como Modelos Semánticos en modo Direct Lake o Warehouses, la suma de los tamaños de los archivos Delta subyacentes accesibles vía ABFSS puede no coincidir exactamente con el tamaño lógico que Fabric gestiona internamente o reporta en otras interfaces. El notebook proporciona el tamaño de lo "visible" y accesible desde Spark a través del sistema de archivos.

  • Permisos: La identidad que ejecuta el notebook (el usuario o un Service Principal) necesita los permisos adecuados para listar workspaces, ítems dentro de esos workspaces y, crucialmente, acceder a las rutas ABFSS de los artefactos.

  • Rendimiento: En entornos con una cantidad masiva de workspaces, artefactos y archivos, la ejecución del notebook podría llevar un tiempo considerable. Se podrían explorar optimizaciones adicionales si esto se convierte en un problema.

  • Tipos de Artefactos: El script se enfoca en artefactos que almacenan volúmenes significativos de datos. Otros tipos, como informes o flujos de datos, generalmente no se escanean por su tamaño de archivo individual.

Referencias

NotebookUtils (former MSSparkUtils) for Fabric - Microsoft Fabric | Microsoft Learn

How much data is being stored in my Fabric OneLake (Lakehouse files and tables) - FourMoo | Microsoft Fabric | Power BI

Lakehouse Folder Size Calculation

0
Subscribe to my newsletter

Read articles from Kilian Baccaro Salinas directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kilian Baccaro Salinas
Kilian Baccaro Salinas