Datalake incremental updates using Microsoft Fabric DataFlow Gen2


Incremental updates means refreshing the most recent or changed data rather than reloading the entire dataset. This is required to improve performance and reduce resource consumption and also to ensure that the data remains up-to-date without data redundancy and is required for large datasets.
Incremental updates for fabric through DataFlow Gen2 was introduced in third quarter of year 2024.
The detailed steps are listed in this article.
But there is catch with this process. The destination supported for incremental updates in the DataFlow Gen2 are limited to only
Fabric Warehouse
Azure SQL Database
Azure Synapse Analytics
More details here.
Lakehouse is not supported by this set up. Trying to set up an incremental refresh using the aforementioned process would result in the following error
Lets assume that you have a use case where you would want to implement incremental updates for a Lakehouse. Above process is not going to work and will error out.
This drawback in the process led me to create a custom process that enables incremental updates using DataFlow Gen2 on a lakehouse.
You can check detailed video here and here incase you want to skip the writeup.
SetUp
So, I have a Azure SQL DB Adventureworks
that has a table named SalesOrderHeader
.We would use this table, SalesOrderHeader
as a source for the Dataflow and any changes(updates/inserts) made to this table will be copied over to the Lakehouse table destination.
Here is a sample image of the data that I am pulling.
To get started, first create a new Lakehouse and a new DataFlow. The DataFlow would move data from Azure SQL DB to the Lakehouse .
Name the Lakehouse as Incremental_Data_Load_POC
and the DataFlow as Load Incremental Data
.
Steps
Add a new source to the DataFlow and select SQL Server Database as a source.
Enter the Azure SQL DB connection details
Next, Select the source table and click Create
In DataFlow canvas click the destination and select LakeHouse as the destination
In the wizard that follows , select your lakehouse destination. Ensure that you select the New table option and the tablename in the lakehouse destination matches to that of the source table name. Though its not a rule.
and in the next step under the update method , select Replace.
Note : Since this is the first/initial load, selection of Append or Replace option wouldn’t make difference in the behavior of the process.
Click Save settings and once done, we should see the Lakehouse option visible in the DataFlow for the SalesOrderHeader
table.
The source data should now be visible in the DataFlow canvas.
Note : The above data is the source data and NOT the lakehouse data as we still haven’t published the DataFlow.
In the next step, select Publish option for the DataFlow under the query settings on the right side of the canvas.
Once all the steps are completed you should see the newly created Lakehouse and DataFlow under the workspace.
Publishing the Dataflow will move the data to the destination lakehouse.
Query the table through SQL endpoints in the lakehouse to ensure that data was transferred to the lakehouse.
Note : The above steps were for the first/initial load. To implement incremental load/update/refresh we need to make a few changes in the initial DataFlow set up that we created for the first load.
Open the DataFlow and add a new Blank Query item.
and add the following query
let
Source = Lakehouse.Contents([]),
#"Navigation 1" = Source{[workspaceId = "workspaceId"]}[Data],//id of your workspace
#"Navigation 2" = #"Navigation 1"{[lakehouseId = "lakehouseId"]}[Data],//id of the lakehouse created earlie
#"Navigation 3" = #"Navigation 2"{[Id = "SalesOrderHeader", ItemKind = "Table"]}[Data],
_Max=List.Max(#"Navigation 3"[ModifiedDate])
in
#"_Max"
Rename the query to Return Max Modified Date
.
The query returns the max value of the ModifiedDate
from the SalesOrderHeader
table from the lakehouse Incremental_Data_Load_POC
.
Now that the initial/first load is complete, we will have to make some minor changes to the lakehouse destination in the DataFlow to enable incremental updates.
Go back to the lakehouse destination in the DataFlow and change the destination target from New Table to Existing table and select SalesOrderHeader
table from the lakehouse.
and in the Next step, change the Update method from Replace to Append and click Save settings.
The next step is very important and crucial.
Any database system that has a modified date column across tables would have a built-in mechanism that updates the modified date whenever data changes or new inserts occur in a table. So, to reproduce that behavior lets update Modified date for SalesOrderId 75122
in the source table. You can select any other SalesOrderId from the table.
update Sales.SalesOrderHeader set ModifiedDate=GETDATE() where SalesOrderID=75122
In the next step, we have to filter out the data from the source table where the modified date in the source table is greater than the modified date returned by Return Max Modified Date
query.
To do so,we add a filter to the Modified date column.
As the modified date for SalesOrderID 75122
reflects the timestamp when the update occurred, the modified date value for SalesOrderID 75122
will be greater than the maximum modified date in the lakehouse table.
After setting up of the filter, as expected the filter returns records(in our example SalesOrderId75122
) that have Modified date greater than the modified date returned by the Return Max Modified Date
query.
Next, proceed to publish the Dataflow. The following refresh after the Dataflow publish, will transfer these filtered records to the lakehouse table.
As a result, there will be two rows for SalesOrderId 75122
each with a different modified date: the first row will have the original modified date from the initial load and the second row will have the updated modified date.
Querying the SQL endpoints shows two rows for SalesOrderId 75122
each with its respective modified date.
In the next step, create a PySpark notebook to remove the duplicate rows and retain the row with the most recent modified date. You can use any other option to dedup the table.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("Delete Duplicates") \
.getOrCreate()
resh-in-microsoft-fabric-dataflow-gen2
df = spark.table("SalesOrderHeader")
window = Window.partitionBy("SalesOrderId").orderBy(F.col("ModifiedDate").desc())
df_with_row_number = df.withColumn("row_num", F.row_number().over(window))
df_latest = df_with_row_number.filter(F.col("row_num") == 1).drop("row_num")
df_latest.write.mode("overwrite").saveAsTable("SalesOrderHeader")
In the data pipeline you can add DataFlow Gen2 as the first step followed by the Notebook that uses the above script that dedupes the data from the lakehouse table.
This completes the overall process of implementation of incremental updates in fabric lakehouse using DataFlow Gen2.
The process will also handle newly added rows in the data source without requiring any changes to the set up.
Video Walkthrough
In the video above, I missed showing how newly added data is transferred to the lakehouse. So I've created a follow-up video to explain that process.
Conclusion
Through this article, I have attempted to demonstrate a process that overcomes the limitations of the existing Dataflow Gen2 built-in system which is unable to implement incremental updates for the lakehouse.To address these challenges I’ve outlined a customized approach to ensure that the challenges are effectively tackled and robust mechanism is implemented for the same.
Thanks for reading !!!
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
