Delta Lake Tables For Optimal Direct Lake Performance In Fabric Python Notebook

Sandeep PawarSandeep Pawar
11 min read

In my last blog, I showed how to use Polars in a Fabric Python notebook to read, write and transform data. What I did not cover was how to write a Delta Lake table in a Python notebook that’s optimal for DirectLake performance. All engines in Fabric (except for KQL and Python) create a VORDER’d table. So how do you create a Delta Table in a Python notebook since you need Spark to VORDER a table ?

Well, we know that:

  • Power BI semantic models in large format use 8M row segment size so we can create delta table that use large rowgroups.

  • When you create delta table with VORDER, massive row groups are created to apply VORDER on as much data as possible.

  • Large rowgroups help parquet performance

In Polars, we can change the delta_write_options and define the rowgroup size we want. I did that and compared the performance with Spark. Creating large rowgroups can affect write performance, so I measured that as well along with some spark & polars (and Daft) performance.

Setup:

  • Spark: RT1.3, Default spark pool, 8 cores, 64GB, single node. No spark conf changes.

  • Python : polars v1.6, used default 2 cores/16GB and tested with 4 cores/32GB as well

  • Power BI : Custom semantic model with Automatic DL behavior, no other changes

  • All metrics are cold cache

  • I repeated the tests several times to make sure it’s reproducible. The durations may be ± 1s.

  • Data : TPCH SF100, lineitem table with 600 million rows

  • This was in my personal tenant with no other active jobs running in the background

💡
Before you read any further, important to note that this was a set of tests done on one single dataset with a specific query and above configurations. As much as I would like to generalize these results, you should always test based on your use cases. The tests DO NOT mean spark is better/worse than Python. You should use the right tool for the right job. My goal is to show Python notebooks provide lot of benefits for many common scenarios with small jobs. Depending on your use case, you may find it otherwise - always test. It’s great that Fabric provides so many tools to pick from. If you see any discrepancies, please let me know.

Polars

In Polars, you can change the rowgroup size by defining the min/max rows per group and max rows per file. (Read this if you would like learn more about rowgroups)

Below I am reading a 600 million rows Delta table, selecting the top 50M rows and saving it to a delta table with minimum 8M rows, max 16M rows per rowgroup and max 48M rows per file. Since total number of rows are 50M, two files will be created one with 48M rows and another with balance 2M rows. Why 48M millions rows? I don’t know, I just picked a round number that is a multiple of 16. Note that there is no right or wrong rowgroup size. Each engine behaves differently and what works best for one engine may not work for other. I first tested with the default 2 vCores but it ran out of memory. I configured the notebook to 4 & 8 cores.

#### Python notebook
%%configure
{
    "vCores": 4
}
################# next cell #####################
import polars as pl 

df = pl.scan_delta("abfss://ws@onelake.dfs.fabric.microsoft.com/lh/Tables/lineitem").head(50000000).collect(streaming=True)

(df.write_delta("abfss://ws@onelake.dfs.fabric.microsoft.com/lh/Tables/rgtest_polars_4cores_16MRG", delta_write_options={
        "max_rows_per_group": 16_000_000,
        "min_rows_per_group": 8_000_000,
        "max_rows_per_file": 48_000_000
    }))

Spark

I tested, with and without VORDER, with Native Execution Engine, also repartitioned the df to use all 8 cores, with and without OptimizeWrite. Miles Cole has an excellent series of blogs on spark configurations that can impact the spark write performance (VORDER, OptimizeWrite, Deletion Vectors etc.) I didn’t test all the configurations because the goal was to compare out of the box performance to create delta tables that are optimal for Direct Lake.

### DEFAULT
df = spark.sql("SELECT * FROM lh.lineitem LIMIT 50000000")
df.write.saveAsTable("rgtest_spark")

### USING NEE
%%configure 
{ 
   "conf": {
       "spark.native.enabled": "true", 
   } 
}

df = spark.sql("SELECT * FROM lh.lineitem LIMIT 50000000")
df.write.saveAsTable("rgtest_spark_NEE")

### without OptimizeWrite

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "false")
df = spark.sql("SELECT * FROM lh.lineitem LIMIT 50000000")
df.write.saveAsTable("no_optimizewrite")

Direct Lake

I created a custom Direct Lake model and tested cold cache performance for a single table. This is not a representative of real world but to keep things simple and isolate variables, I used only one table and queried it using below DAX:

### DAX
DEFINE
    MEASURE rgtest_spark_wo_vorder[Sum Qty] =
        SUM(rgtest_spark_wo_vorder[L_quantity])
    MEASURE rgtest_spark_wo_vorder[Sum Base Price] =
        SUM(rgtest_spark_wo_vorder[L_extendedprice])
    MEASURE rgtest_spark_wo_vorder[Sum Disc Price] =
        SUMX(
            rgtest_spark_wo_vorder,
            rgtest_spark_wo_vorder[L_extendedprice] * (1 - rgtest_spark_wo_vorder[L_discount])
        )
    MEASURE rgtest_spark_wo_vorder[Sum Charge] =
        SUMX(
            rgtest_spark_wo_vorder,
            rgtest_spark_wo_vorder[L_extendedprice] * (1 - rgtest_spark_wo_vorder[L_discount]) * (1 + rgtest_spark_wo_vorder[L_tax])
        )
    MEASURE rgtest_spark_wo_vorder[Avg Qty] =
        AVERAGE(rgtest_spark_wo_vorder[L_quantity])
    MEASURE rgtest_spark_wo_vorder[Avg Price] =
        AVERAGE(rgtest_spark_wo_vorder[L_extendedprice])
    MEASURE rgtest_spark_wo_vorder[Avg Disc] =
        AVERAGE(rgtest_spark_wo_vorder[L_discount])
    MEASURE rgtest_spark_wo_vorder[Count Order] =
        COUNTROWS(rgtest_spark_wo_vorder)

EVALUATE
    CALCULATETABLE(
        SUMMARIZECOLUMNS(
            rgtest_spark_wo_vorder[L_returnflag],
            rgtest_spark_wo_vorder[L_linestatus],
            "Sum Qty", rgtest_spark_wo_vorder[Sum Qty],
            "Sum Base Price", rgtest_spark_wo_vorder[Sum Base Price],
            "Sum Disc Price", rgtest_spark_wo_vorder[Sum Disc Price],
            "Sum Charge", rgtest_spark_wo_vorder[Sum Charge],
            "Avg Qty", rgtest_spark_wo_vorder[Avg Qty],
            "Avg Price", rgtest_spark_wo_vorder[Avg Price],
            "Avg Disc", rgtest_spark_wo_vorder[Avg Disc],
            "Count Order", rgtest_spark_wo_vorder[Count Order]
        ),
        YEAR(rgtest_spark_wo_vorder[l_shipdate]) > 1990
    )

Results:

💡
Update 12/08/24: I was reminded, and I agree, that this is not a real-world representative DE job where there are no predicates, transformations and just movement/selection of data which limits spark’s real power - parallelism and will affect spark’s read/write performance in above tests. So, before jumping to conclusions - test your own use cases. The DirectLake read performance is irrespective of the performance of the engine used to create the delta tables. Check out Miles’s blog for his detailed comparison.

Write time : Delta write time in seconds

Read Time : DAX read time in DL in seconds

Code for below chart

Data Layout :

Using my code from here, I got how the data are laid out for each of the options.

Metricrgtest_polars_default_8coresrgtest_polars_default_4coresrgtest_spark_partitionedrgtest_spark_needaftrgtest_spark_wo_vorderrgtest_polars_4cores_8MRGrgtest_polars_4cores_16MRGrgtest_spark
Num Files553373421
Num Rowgroups763763161655325247
Num Rows500000005000000050000000500000005000000050000000500000005000000050000000
Delta Size MB144214421320132012541190111211111107
Median File Size MB3023024944942084453555551106
Small Files Percentage0.00.00.00.014.2857140.00.00.00.0
Write Operations000000000
Merge Operations000000000
Delete Operations000000000
Days Since WriteNoneNoneNoneNoneNoneNoneNoneNoneNone
Optimize Count000000000
Days Since OptimizeNoneNoneNoneNoneNoneNoneNoneNoneNone
Vacuum Count000000000
Days Since VacuumNoneNoneNoneNoneNoneNoneNoneNoneNone
Small Writes Percentage0.00.00.00.00.00.00.00.00.0
Is VORDER{Schema N/A}{Schema N/A}{b'true'}{b'true'}{Schema N/A}{N/A}{Schema N/A}{Schema N/A}{b'true'}
Min Rowgroup Size805696080569601250000012500000187238125000002000000200000050000000
Max Rowgroup Size10485760104857601875000018750000830212718750000160000004800000050000000
Median Rowgroup Size10485760104857601875000018750000830212718750000160000002500000050000000
Partition ColumnsNoneNoneNoneNoneNoneNoneNoneNoneNone
Last OPTIMIZE TimestampNoneNoneNoneNoneNoneNoneNoneNoneNone
Last VACUUM TimestampNoneNoneNoneNoneNoneNoneNoneNoneNone
is_shortcutTrueTrueFalseFalseTrueTrueTrueTrueTrue

Observations/Notes:

  • Spark with a VORDER’d table performed the best at 6s with Polars delta table with 8M and 16M row groups almost as good as the VORDER’d table at 7s. So, if you are using Python notebook, configure the rowgroup size for optimal DirectLake performance. This also means that if you are creating delta tables outside of Fabric, cannot use Fabric engines for whatever reason & shortcutting them into the Lakehouse, tuning the rowgroup in the external engine may be an option to achieve better DirectLake performance (compared default delta table), - at least it’s worth testing.

  • Spark used 8 cores, Python was 4 cores. So similar performance for this data & query at lower CU consumption.

  • 2vcore Python notebook ran out of memory but at 4cores/32GB, the peak memory consumption was 50%. If I had, say 5M fewer rows, 2vcores would have been sufficient. Note here that I read 600M rows !!! That’s 40GB on disk and yet Polars was able to effectively scan and return and write with enough memory to spare. I used scan_delta to use Polars in lazy eval mode with streaming=True.

  • VORDER (or large rowgroups) helps cold cache performance a lot (at least it did in this case). spark with and without VORDER: 6s vs 10s. Polars with and without large rowgroup: 7s vs 12s.

  • What’s also interesting is that while I didn’t show the warm cache performance, VORDER/large RG tables also performed well. e.g. spark with and without VORDER, warm cache : 1s vs 3s. This is because once cached, DL doesn’t perform any additional optimizations so large rowgroups likely help. Again, at least it did for the query I ran.

  • We know the default spark has some configuration overhead (read Miles’ blog). Turning off OptimizeWrite helped. As Miles pointed out OW makes sense only for partitioned tables (also don’t partition the tables in general). So, Spark performance is not as bad as it looks if you follow the best practices Miles shared in his blog. Also, if you are not using NEE, you are leaving some serious money on the table.

  • Daft actually did very well. The avg performance was as good as Polars but I saw at times it created the table in under a minute. But unlike Polars, I couldn’t configure the rowgroups. I don’t think it’s supported. Daft also created many smaller files compared to Polars default. Daft + Polars above was using Daft for reading (Daft is awesome at reading) and Polars for writing.

  • Spark default took 242s but out of that read was 60-70s and write was the rest based on the logs.

  • Polars 16M rowgroup performed as good as the 8M rowgroup so 8M could be a good starting point. I did not test if it has any adverse impact on delta, SQL EP read performance.

  • I would expect the performance to be similar, directionally, in a dimensional model with many tables but that has to be tested.

  • I did not test Duckdb. The results would be similar as you can configure rowgroups in Duckdb as well. Mim tested Duckdb and showed you can stream the entire 600M dataset with rowgroup configured 🙌: https://x.com/mim_djo/status/1865336020700647481

  • I tried testing zstd compression as well in Polars but couldn’t get that to work. Spark, Polars, Daft use snappy. zstd compresses the data even more but may come at read/write performance overhead. Since Polars was so fast, I thought of testing that as well. I have tested zstd in spark on 30+ tables and found the perf to be a mixed bag.

  • Both VORDER and large rowgroup compressed ~9% more.

  • Hopefully Fabric PG/CAT will provide more guidance on this.

  • If you find any discrepancies or have questions, please let me know. Also, curious to see what others find in their tests.

Conclusion:

  • If you are using Python notebooks, you have options to create optimal delta tables for Direct Lake performance.

  • The wrong way to interpret the results above is to think that Spark is worse than Python. If you reach that conclusion, I encourage you to do more research to learn how to use these engines effectively. Use the right tool for the right job. At a very large scale, Spark will excel, so always test! I'm glad we have all these tools in Fabric.

  • Polars (and Daft) are awesome, love all the OSS tools.

  • The results above are for a specific dataset and a specific query, perform tests for your use cases to find what works best.

In the next blog, I will show how to use Daft to scan 100M+ dataset from a remote cloud storage effortlessly in a Python notebook.

References

Tuning Parquet file performance | Dremio

Uri & Menachem: Using Row Groups for fast filtering of large parquet files (HE)|PyData Tel Aviv 2024

Native execution engine for Fabric Spark - Microsoft Fabric | Microsoft Learn

Mim : He is one-stop source for all things Python engine

Miles Cole | A Microsoft data & analytics blog

Delta Lake Tables For Optimal Direct Lake Performance In Fabric Python Notebook

2
Subscribe to my newsletter

Read articles from Sandeep Pawar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sandeep Pawar
Sandeep Pawar