Working With Delta Tables In Fabric Python Notebook Using Polars

Sandeep PawarSandeep Pawar
5 min read

The much-anticipated Python notebook in Fabric is finally available and the Fabric users have already developed cool libraries and blogged about the usefulness of these notebooks. Duckdb is everyone’s favorite, but I am a Python guy 😁 so here is quick overview of how you can use Polars in the Python notebook.

Polars is an open-source library that uses a Rust engine and supports multi-threaded execution. This means it's significantly faster than pandas and, in some cases, even faster than Spark. It can efficiently use the limited resources available in Python notebooks (2 cores, 16GB RAM). Polars v1.6 is installed in the default Python notebook environment. So, let's see how to perform some common operations. :

Reading Delta Table

You can read the delta table two different ways - eagerly and lazily.

If you have mounted a lakehouse, you can use the relative path "/lakehouse/default/Tables/my_table_name" or to read a table from any lakehouse, use the abfs path "abfss://<ws>@onelake.dfs.fabric.microsoft.com/<lh/wh>/Tables" . In the below examples, I stick with abfs.

Lazy dataframe:

Similar to spark, you can load a delta table lazily , i.e. not load it in memory, using scan_delta . This is a great option to load tables that are larger than memory (default 16GB).


import polars as pl
## lazy loading version=0 of the delta table
abfs_path = "abfss://<ws>@onelake.dfs.fabric.microsoft.com/<lh>/Tables"
pl.scan_delta(f"{abfs_path}/<tablename>")

Eager dataframe

Use read_delta to load it in memory, good for smaller tables. It can also do projection pushdown. Below I am loading only the selected columns.

Read Schema

If you want to look at schema without loading the data:

Column names

You can use .columns too if the dataframe has been loaded in memory.

Time Travel

I showed above, but you can specify the delta table version to use. Below I am loading only selected columns, 2 rows of version 3 of the delta table. If you applied any filters, it will pushdown the predicate as well before returning the data.

Instead of version, you can also specify the datetime:

 (pl.scan_delta(
    table_path, 
    version=datetime(2023, 12, 12, tzinfo=timezone.utc))
  )

By default, latest version is loaded if version is not specified.

Write Delta Table

To create a delta table, use write_delta method. I am skipping defining the schema but always explicitly define the schema as a best practice. Below I read from the Files section of the lakehouse, do some basic aggregation and save the result as a delta table. Similar to spark, you can specify the mode, i.e. overwrite, append etc.

Note the date column below. There was discussion yesterday on Reddit (r/MicrosoftFabric) about saving a delta table with a timestamp column. To do it using polars, use the replace_with_timezone to avoid TIMEZONE_NTZ error. Below I cast the columns as date and datetime and it works in SQL EP & DL.

import polars as pl

# Extract
df = pl.read_csv("abfss://<ws>@onelake.dfs.fabric.microsoft.com/<lh>/Files/Criteo/*.csv")

# Transform
result = (df
    .group_by(["country_region", "date"])
    .agg(
        pl.col("transit_stations_percent_change_from_baseline")
        .mean()
        .alias("mean_transit_change")
    )
)

# Load
(result
    .with_columns([
        pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d").alias("date"),
        pl.col("date").str.strptime(pl.Datetime("us"), format="%Y-%m-%d")
            .dt.replace_time_zone(time_zone="UTC")
            .alias("datetime")
    ])

).write_delta(f"{abfs_path}/my_polars_table", mode="overwrite")

Above, I saved as delta table but you can save as parquet similarly with write_parquet (or sink_parquet() if you are writing a large df).

These are some very high-level details enough to get started. To learn more about Polars, read my friend Yuki’s book. Also check out blogs and courses by Liam Brannigan. Liam will have Polars course for Dela Lake soon and I am looking forward to it.

TIP:

Polars supports Altair for interactive visualization which is also installed in the default Python environment.

import polars as pl
import altair as alt
# Read csv, 110M rows
df = pl.read_csv("abfss://559bed5-54b57fbaaf74@onelake.dfs.fabric.microsoft.com/5e3fa2ff--72812378da12/Files/Criteo/*.csv", columns=["country_region", "transit_stations_percent_change_from_baseline"], use_pyarrow=True)


# Agg
result = (df
    .group_by(["country_region"])
    .agg(
        pl.col("transit_stations_percent_change_from_baseline")
        .mean()
        .alias("mean_transit_change")
    )
)

(alt
    .Chart(result, title = "Polars Can Be Used With Altair", width=1200)
    .mark_bar()
    .encode(x="country_region", 
            y ="mean_transit_change")
).interactive()

Notes

  • Other than Polars, I would also recommend getting familiarized with Arrow. If you like SQL, you can pass data between Duckdb and Polars without much overhead using Arrow, thanks to zero-copy integration. In fact, in Spark 4, there will be a .toarrow() method which will also allow using spark, Polars and Duckdb efficiently.

  • scan_csv() does not work in Fabric for some reason

  • If you use scan_delta(), you cannot use write_delta() unless you collect the results.

  • Use scan_delta().collect(streaming=True) for very large tables that are larger than memory

  • I didn’t touch on Pandas vs Polars vs spark etc. There are already a million blogs on this topic and would encourage you to do more research on your own. tl;dr: pandas is single threaded and spark has overhead for small data, Polars provides a happy medium. Always start with Duckdb/Polars and grow into Spark.

  • You can partition and Z-order a table but not apply VORDER, Liquid Clustering, that’s spark only.

  • Semantic Link and Semantic Link Labs return a pandas df. To convert it to polars, use pl.from_pandas(df)

  • I joked about SQL above. If you want to, you can use SQL as well :

      df = pl.read_csv("abfss://54b57fbaaf74@onelake.dfs.fabric.microsoft.com/8da12/Files/Criteo/*.csv", columns=["country_region", "transit_stations_percent_change_from_baseline"], use_pyarrow=True)
      df.sql(" SELECT country_region, avg(transit_stations_percent_change_from_baseline) as mean from self group by country_region order by mean")
    
  • FWIW, for the above CSV with 110M rows, polars: 17s*, duckdb:11 s, daft: 5s, spark : 19s (*not a fair comparison as duckdb, daft use lazy evaluation, polars’s scan_csv did not work in Fabric so I had to use read_csv so it’s expected but still worth keeping in mind what’s working/not working in Fabric)

      import daft
    
      df = daft.read_csv("/lakehouse/default/Files/Criteo/*.csv")
    
      result = (df
          .groupby(["country_region"])
          .agg(df["transit_stations_percent_change_from_baseline"]
              .mean()
              .alias("mean_transit_change")
          )
      )
      result.collect()
    
  • Polars — DataFrames for the new era

  • Polars - Delta Lake Documentation

  • Modern Polars

0
Subscribe to my newsletter

Read articles from Sandeep Pawar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sandeep Pawar
Sandeep Pawar