Datalake incremental updates using Microsoft Fabric DataFlow Gen2

Incremental updates means refreshing the most recent or changed data rather than reloading the entire dataset. This is required to improve performance and reduce resource consumption and also to ensure that the data remains up-to-date without data redundancy and is required for large datasets.

Incremental updates for fabric through DataFlow Gen2 was introduced in third quarter of year 2024.

The detailed steps are listed in this article.

But there is catch with this process. The destination supported for incremental updates in the DataFlow Gen2 are limited to only

  • Fabric Warehouse

  • Azure SQL Database

  • Azure Synapse Analytics

More details here.

Lakehouse is not supported by this set up. Trying to set up an incremental refresh using the aforementioned process would result in the following error

Incremental Update in DataFlow GEN2

Lets assume that you have a use case where you would want to implement incremental updates for a Lakehouse. Above process is not going to work and will error out.

This drawback in the process led me to create a custom process that enables incremental updates using DataFlow Gen2 on a lakehouse.

You can check detailed video here and here incase you want to skip the writeup.

SetUp

So, I have a Azure SQL DB Adventureworks that has a table named SalesOrderHeader .We would use this table, SalesOrderHeader as a source for the Dataflow and any changes(updates/inserts) made to this table will be copied over to the Lakehouse table destination.

Here is a sample image of the data that I am pulling.

Incremental Update in DataFlow GEN2

To get started, first create a new Lakehouse and a new DataFlow. The DataFlow would move data from Azure SQL DB to the Lakehouse .

Name the Lakehouse as Incremental_Data_Load_POC and the DataFlow as Load Incremental Data.

Steps

Add a new source to the DataFlow and select SQL Server Database as a source.

Incremental Update in DataFlow GEN2

Enter the Azure SQL DB connection details

Incremental Update in DataFlow GEN2

Next, Select the source table and click Create

Incremental Update in DataFlow GEN2

In DataFlow canvas click the destination and select LakeHouse as the destination

Incremental Update in DataFlow GEN2

Incremental Update in DataFlow GEN2

In the wizard that follows , select your lakehouse destination. Ensure that you select the New table option and the tablename in the lakehouse destination matches to that of the source table name. Though its not a rule.

Incremental Update in DataFlow GEN2

and in the next step under the update method , select Replace.

Incremental Update in DataFlow GEN2

Note : Since this is the first/initial load, selection of Append or Replace option wouldn’t make difference in the behavior of the process.

Click Save settings and once done, we should see the Lakehouse option visible in the DataFlow for the SalesOrderHeader table.

Incremental Update in DataFlow GEN2

The source data should now be visible in the DataFlow canvas.

Incremental Update in DataFlow GEN2

Note : The above data is the source data and NOT the lakehouse data as we still haven’t published the DataFlow.

In the next step, select Publish option for the DataFlow under the query settings on the right side of the canvas.

Incremental Update in DataFlow GEN2

Once all the steps are completed you should see the newly created Lakehouse and DataFlow under the workspace.

Incremental Update in DataFlow GEN2

Publishing the Dataflow will move the data to the destination lakehouse.

Query the table through SQL endpoints in the lakehouse to ensure that data was transferred to the lakehouse.

Incremental Update in DataFlow GEN2

Note : The above steps were for the first/initial load. To implement incremental load/update/refresh we need to make a few changes in the initial DataFlow set up that we created for the first load.

Open the DataFlow and add a new Blank Query item.

Incremental Update in DataFlow GEN2

and add the following query

let
  Source = Lakehouse.Contents([]),
  #"Navigation 1" = Source{[workspaceId = "workspaceId"]}[Data],//id of your workspace
  #"Navigation 2" = #"Navigation 1"{[lakehouseId = "lakehouseId"]}[Data],//id of the lakehouse created earlie
  #"Navigation 3" = #"Navigation 2"{[Id = "SalesOrderHeader", ItemKind = "Table"]}[Data],
  _Max=List.Max(#"Navigation 3"[ModifiedDate])
in
  #"_Max"

Rename the query to Return Max Modified Date.

The query returns the max value of the ModifiedDate from the SalesOrderHeader table from the lakehouse Incremental_Data_Load_POC.

Incremental Update in DataFlow GEN2

Now that the initial/first load is complete, we will have to make some minor changes to the lakehouse destination in the DataFlow to enable incremental updates.

Go back to the lakehouse destination in the DataFlow and change the destination target from New Table to Existing table and select SalesOrderHeader table from the lakehouse.

Incremental Update in DataFlow GEN2

and in the Next step, change the Update method from Replace to Append and click Save settings.

Incremental Update in DataFlow GEN2

The next step is very important and crucial.

Any database system that has a modified date column across tables would have a built-in mechanism that updates the modified date whenever data changes or new inserts occur in a table. So, to reproduce that behavior lets update Modified date for SalesOrderId 75122 in the source table. You can select any other SalesOrderId from the table.

update Sales.SalesOrderHeader set ModifiedDate=GETDATE() where SalesOrderID=75122

In the next step, we have to filter out the data from the source table where the modified date in the source table is greater than the modified date returned by Return Max Modified Date query.

To do so,we add a filter to the Modified date column.

Incremental Update in DataFlow GEN2

As the modified date for SalesOrderID 75122reflects the timestamp when the update occurred, the modified date value for SalesOrderID 75122 will be greater than the maximum modified date in the lakehouse table.

Incremental Update in DataFlow GEN2

After setting up of the filter, as expected the filter returns records(in our example SalesOrderId75122) that have Modified date greater than the modified date returned by the Return Max Modified Date query.

Next, proceed to publish the Dataflow. The following refresh after the Dataflow publish, will transfer these filtered records to the lakehouse table.

As a result, there will be two rows for SalesOrderId 75122 each with a different modified date: the first row will have the original modified date from the initial load and the second row will have the updated modified date.

Querying the SQL endpoints shows two rows for SalesOrderId 75122 each with its respective modified date.

Incremental Update in DataFlow GEN2

In the next step, create a PySpark notebook to remove the duplicate rows and retain the row with the most recent modified date. You can use any other option to dedup the table.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("Delete Duplicates") \
    .getOrCreate()
resh-in-microsoft-fabric-dataflow-gen2
df = spark.table("SalesOrderHeader")

window = Window.partitionBy("SalesOrderId").orderBy(F.col("ModifiedDate").desc())

df_with_row_number = df.withColumn("row_num", F.row_number().over(window))

df_latest = df_with_row_number.filter(F.col("row_num") == 1).drop("row_num")

df_latest.write.mode("overwrite").saveAsTable("SalesOrderHeader")

In the data pipeline you can add DataFlow Gen2 as the first step followed by the Notebook that uses the above script that dedupes the data from the lakehouse table.

This completes the overall process of implementation of incremental updates in fabric lakehouse using DataFlow Gen2.

The process will also handle newly added rows in the data source without requiring any changes to the set up.

Video Walkthrough

In the video above, I missed showing how newly added data is transferred to the lakehouse. So I've created a follow-up video to explain that process.

Conclusion

Through this article, I have attempted to demonstrate a process that overcomes the limitations of the existing Dataflow Gen2 built-in system which is unable to implement incremental updates for the lakehouse.To address these challenges I’ve outlined a customized approach to ensure that the challenges are effectively tackled and robust mechanism is implemented for the same.

Thanks for reading !!!

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar