Reading Delta Tables With ColumnMapping Using Polars

Sandeep PawarSandeep Pawar
2 min read

There was question a couple of days ago on r/MicrosoftFabric subreddit on reading Data Warehouse tables shortcutted into Lakehouse. You can easily query this using Spark or T-SQL in the notebook, the question was how to do this using Polars since delta tables created by Datawarehouse have Column Mapping enabled. Polars is built on Delta-rs which does not support reading tables with Column Mapping yet.

Below is a crude approach I came up with to map the logical column names to physical column names.

💡
Before you proceed, please note that this is a very inefficient solution and comes with many performance limitations. So, unless you have very small data and you can verify the data, I would advise using Spark or T-SQL. Verify and validate.

The logic:

  • Get the logical column names and physical column names to make a dictionary

  • Get the parquet files from the delta transaction log

  • Apply column mapping

  • Read and union

As you can see from above, you lose the parallelization and efficiency in the process.

Code

#Python notebook
import polars as pl
from deltalake import DeltaTable
import os

def scan_delta_cm(path: str) -> pl.LazyFrame:
   delta_table = DeltaTable(path)

   colmaps: dict[str, str] = dict()
   for field in delta_table.schema().fields:
       logical_name = field.name
       physical_name = field.metadata.get("delta.columnMapping.physicalName", field.name)
       colmaps[logical_name] = physical_name

   all_lazy_frames = []
   for add_action in delta_table.get_add_actions(flatten=True).to_pylist():
       file_path = os.path.join(delta_table.table_uri, add_action["path"])
       lazy_df = pl.scan_parquet(file_path)

       file_schema = lazy_df.collect_schema()
       available_columns = file_schema.names()

       select_exprs = []
       for logical_name, physical_name in colmaps.items():
           if physical_name in available_columns:
               select_exprs.append(pl.col(physical_name).alias(logical_name))

       if select_exprs:
           lazy_df = lazy_df.select(select_exprs)
           all_lazy_frames.append(lazy_df)

   if all_lazy_frames:
       return pl.concat(all_lazy_frames)
   else:
       raise ValueError("No data files found")

#path is abfs path of the table
df = scan_delta_cm(path).collect()
# df.head()

Think of this more of an experiment than a solution which will work for limited cases (tables with deletion vectors wont work either as expected). For any business critical job, I would advise using Spark in such scenarios.

The other easier alternative is to use Duckdb which supports tables with columnMapping.

If you must use Polars, you can zero-copy this duckdb to polars df.

1
Subscribe to my newsletter

Read articles from Sandeep Pawar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sandeep Pawar
Sandeep Pawar

Principal Program Manager, Microsoft Fabric CAT helping users and organizations build scalable, insightful, secure solutions. Blogs, opinions are my own and do not represent my employer.