Microsoft Fabric Delta Lake optimization

In my previous two part article, I explained how Z order works and functions. You can read about it here and here.

In this article, I will focus on demonstrating the practical implementation of optimization techniques for a delta table. I will demonstrate how to implement different optimization techniques can be leveraged to enhance the performance and efficiency of a delta table.

SetUp

Lets first start creating the data. But before that we will change couple of default settings. This is to ensure that we get the real picture of the real performance gain.

spark.conf.set('spark.microsoft.delta.optimizeWrite.enabled', 'false')
spark.conf.set('spark.sql.parquet.vorder.enabled', 'false')
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)

The first setting, the optimize write is set to off. This is to ensure that the data is not auto-compacted else the OPTIMIZE command won't have any effect.

We will also disable the V-order settings. V order is a write time optimization technique. V-order works by applying special sorting, row group distribution, dictionary encoding and compression on parquet files, thus requiring less network, disk, and CPU resources in compute engines to read it, providing cost efficiency and performance. V-order sorting has a 15% impact on average write times but provides up to 50% more compression. Don't forget to turn the features on after testing.

The next setting we disable is the maxRecordsPerFile. This will ensure that we limit the writes to the data files to 1000 rows per file. This will help us to mimic the small file scenario. As we would create only 100K rows its difficult to replicate the small file scenario on such a small data size.

Data

Our data has four columns.

  • ID

  • Name

  • Age

  • Category

The ID column has the lowest cardinality, followed by the Name column, which has a higher cardinality than ID. The Age column has a higher cardinality than both the ID and Name columns. Finally the Category column has the highest cardinality among all four columns.

ID is an autoincremented value, 100 unique name values ,50 unique age values and 20 unique categories. Name and age will be randomly assigned across 20 categories.

Code

Initial configuration settings :

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import time
import random

spark = SparkSession.builder.appName("Delta Lake Performance Test").getOrCreate()
spark.conf.set('spark.microsoft.delta.optimizeWrite.enabled', 'false')
spark.conf.set('spark.sql.parquet.vorder.enabled', 'false')
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)

Create data with ID, Name, Age and random categories assigned across different combinations of names and age.

def create_categories(num_categories):
    return [f"Category_{i+1}" for i in range(num_categories)]

def random_categories(age, categories_list):
    num_categories = random.randint(1, 20)  
    return random.sample(categories_list, num_categories)

num_categories = 20  
categories_list = create_categories(num_categories)
data = [(i, f"Name_{i % 100}", i % 50, random_categories({i % 20}, categories_list)) for i in range(1, 10000)]

Next we write the data list to a delta table called Z_Order

columns = ["ID", "Name", "Age", "Category"]
df = spark.createDataFrame(data, columns)
df = df.withColumn("Category", explode("Category")).drop("Categories")
delta_table_path = "Tables/Z_Order"
df.write.format("delta").mode("overwrite").save(delta_table_path)

When executed, multiple files will be created under the delta table.

I will recommend to use Azure Storage Explorer to view the files. With this tool , you can also preview data in the parquet files which isn’t possible with the Fabric UI.

You can watch the video below by Bradley Ball on how to set it up.

https://www.youtube.com/watch?v=xingcILHkc0

We will use the following query to test the performance.

select age,count(category) from Z_Order where age between 20 and 40 group by age

The query fetches category count across the age ranges between 20 and 40. Recall that Category has the highest cardinality and Age has the second highest cardinality.

Please note that the distribution of age across categories is not even, with some categories containing a higher concentration of certain age groups, while others may have a more diverse or skewed age values.

We will test and compare the following four scenarios:

Without optimize and file compaction

With optimize and file compaction

With optimize and Z Order

With data partitioning

Lets test the first scenario : Without optimize and file compaction.

Execute the following query.

%%time
spark.sql("select age,count(category) from Z_Order where age between 20 and 40 group by age").show()
spark.sql("select age,count(category) from Z_Order where age between 20 and 40 group by age").collect()

9.23 seconds to return result on a 100K rows data. that’s bad.. really bad.
Looking at the query execution plan we can see the problem.

The reason is quite obvious. The query had to scan about 105 files and that took almost 5 seconds.

Next, we test with optimize

from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "Tables/Z_Order")
delta_table.optimize()

There is some improvement, with execution time reduced to 5.22 seconds but still not that satisfactory.

The command did not reduce the read on the number of files but did improve the performance in terms of execution time.

Next, lets run the optimize command with compaction i.e executeCompaction()

from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "Tables/Z_Order")
delta_table.optimize().executeCompaction()

Rerunning our sql statements

Performance is much better now compared to the previous two executions.

The number of files read is now 1 instead of 105 unlike the previous two cases. Lets verify it through Azure storage explorer.

The command iteratively combined and compacted all files into one single file. The difference in the Last Modified date and File Size confirms it.

We can cross check it in the json file in the delta log as well.

The newly created file is marked as “add” while the rest of the files are marked “remove”. To return the queried data, the spark engine will refer only the file/s that is marked as “add” and not the one that’s marked “remove”.

If you want to delete the files that no longer will be referenced by Spark, you can use the VACCUM command to periodically delete them.

https://docs.delta.io/latest/delta-utility.html#-delta-vacuum

Now lets drop the table and recreate it to test Z Order by age.

from delta.tables import DeltaTable
delta_table_path = "Tables/Z_Order"
DeltaTable.forPath(spark, delta_table_path).optimize().executeZOrderBy("age")

This is the best execution time compared to the earlier set ups. Also it performed the similar file operation as executeCompaction().

One noticeable thing of interest is that the size of the compacted file by Z order is lesser than the one created by executeCompaction().

Lets check if it makes any difference in the query plan.

The number of rows read (104,481) with Z order is equal to the rows returned by executeCompaction with slight improvement in the scan time. I would suspect that Z order applies some additional compression algorithm to compress the data apart from file skipping algorithm. The file size difference is negligible but we are dealing only with little above 100k records and not billions. The difference in performance I guess would be quite significant on a huge amount of data.

Next, lets create data partitioned by Age. Just change the following command from this in the data generation code

df.write.format("delta").mode("overwrite").save(delta_table_path)

to this

df.write.partitionBy("Age").format("delta").mode("overwrite").save(delta_table_path)

You should have data partitioned across Age.

with small files created across every partition.

Lets test our queries without running executeCompaction or Z order.

This by far is the most fastest execution time. This is expected as the query would touch only those partitions that qualifies the predicates of the query.

Only 44K rows were queried to return the output where as in other previous cases around 100K rows were queried.

Lest run the executeCompaction() and check if it improves the execution time.

from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "Tables/Z_Order")
delta_table.optimize().executeCompaction()

executeCompaction() creates one consolidated file across each partition.

and reduces the execution time to almost half.

Checking the execution plan, we see that the read on the number of files and the scan time reduced dramatically.

But Z ordering the data made negligible difference to the performance.

One advantage of Z ordering is that, you can specify the columns to Z order in the Zorder by clause.

%%sql
OPTIMIZE Z_Order
WHERE Age Between 20 and 40
ZORDER BY (Category)

We can Zorder data by Category as it has high cardinality and data is partitioned on Age.

Please note the conditional Z Order does not work on columns that are not partitioned .Running the above statement on non a non partitioned column errors out

Running the command on a partitioned column succeeds

Conclusion

In conclusion, optimizing the read processing with different techniques significantly enhances query performance by improving data locality and reducing execution time. Implementing these strategies on delta tables ensures efficient storage and faster data retrieval especially for large datasets.

These improvements not only save resources but also provide a great user experience for analytical workloads.

Thanks for reading !!!

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar