Implementing SCD Type 2 Using Spark SQL and Delta Tables

Table of contents
- Introduction to SCD Type 2
- Challenges with Spark SQL
- Step-by-Step Implementation
- Step 1: Create a Sample Employee Table
- Step 2: Remove Files in Databricks Hive Directory (Optional)
- Step 3: Create a Sample Employee Table
- Step 4: Add Sample Data to Employee Table
- Step 5: Implement SCD Type 2 with Merge Statement
- Step 6: Create Sample Source Data
- Step 7: Call the Function
- Step 8: Verify the Results
- Step 9: Add More Sample Data to Verify Logic
- Step 10: Call the Function Again
- Step 11: Verify the Results
- Conclusion
In this blog, I will walk you through a straightforward approach to implementing Slowly Changing Dimension (SCD) Type 2 using Spark SQL and Delta Tables. This method ensures atomicity and maintains the history of records effectively.
Introduction to SCD Type 2
SCD Type 2 is a technique used to track historical data in a data warehouse. When a change occurs in an existing record, the current record is marked as inactive, and the updated record is inserted as a new row with isActive = 1
. Additionally, if the source dataset contains a completely new record, it is inserted into the destination table.
Challenges with Spark SQL
Many people use the MERGE
statement to implement SCD Type 2. However, a common mistake is updating the existing record as inactive without creating a new version of the record. This can be addressed by writing a separate INSERT
statement after the MERGE
statement. However, Spark SQL does not support BEGIN
and COMMIT
transactions, which means the implementation must be atomic. If the MERGE
statement succeeds but the INSERT
statement fails, the target table may end up with no active record for a particular dimension record.
Step-by-Step Implementation
Let's explore how to implement this logic step by step in Databricks Spark SQL.
Step 1: Create a Sample Employee Table
First, create a sample Employee table, which will be our target table. To make the notebook re-runnable, drop the table first and recreate it.
%sql
drop table if exists dimemployee
Step 2: Remove Files in Databricks Hive Directory (Optional)
This step is optional and may not be required. If you encounter an error stating that files already exist for a given Delta table even after dropping the table, you can remove the files manually.
try:
if dbutils.fs.ls("dbfs:/user/hive/warehouse/dimemployee"):
dbutils.fs.rm("dbfs:/user/hive/warehouse/dimemployee", recurse=True)
except Exception as e:
if "java.io.FileNotFoundException" in str(e):
print("Directory does not exist, nothing to remove.")
else:
raise e
Step 3: Create a Sample Employee Table
Create the destination table with columns isActive
, start_date
, and end_date
to help implement SCD Type 2 changes.
%sql
CREATE OR REPLACE TABLE dimEmployee (
employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date DATE,
job_id STRING,
salary DECIMAL(10, 2),
department_id INT,
isActive BOOLEAN,
start_date DATE,
end_date DATE
);
Step 4: Add Sample Data to Employee Table
Insert some sample data into the Employee dimension table.
%sql
INSERT INTO dimEmployee VALUES
(1, 'John', 'Doe', 'john.doe@example.com', '555-1234', '2022-01-15', 'DEV', 60000.00, 10, true, '2022-01-15', null),
(2, 'Jane', 'Smith', 'jane.smith@example.com', '555-5678', '2021-03-22', 'HR', 75000.00, 20, true, '2021-03-22', null),
(3, 'Michael', 'Brown', 'michael.brown@example.com', '555-8765', '2020-07-30', 'FIN', 80000.00, 30, true, '2020-07-30', null),
(4, 'Emily', 'Davis', 'emily.davis@example.com', '555-4321', '2019-11-10', 'MKT', 65000.00, 40, true, '2019-11-10', null),
(5, 'David', 'Wilson', 'david.wilson@example.com', '555-6789', '2018-05-25', 'DEV', 70000.00, 10, true, '2018-05-25', null);
Step 5: Implement SCD Type 2 with Merge Statement
Create a function to implement SCD Type 2 using the MERGE
statement. This statement will handle the following scenarios:
Update the existing active record and mark it as inactive if the source dataset contains an update.
Insert the new updated record and mark it as active.
Insert the new record if it does not exist in the dimension table.
def scdType2MergeComplete():
mergeQuery1 = """
MERGE INTO dimEmployee
USING (
SELECT SRC.employee_id AS mergeKey, SRC.*
FROM vwSource SRC
UNION ALL
SELECT NULL AS mergeKey, SRC.*
FROM vwSource SRC
JOIN dimEmployee EMP ON EMP.employee_id = SRC.employee_id
WHERE EMP.isActive = 1 AND SRC.salary <> EMP.salary
) staged_updates
ON dimEmployee.employee_id = staged_updates.mergeKey
WHEN MATCHED AND dimEmployee.isActive = 1 AND dimEmployee.salary <> staged_updates.salary THEN
UPDATE SET isActive = 0, end_date = current_date() - 1
WHEN NOT MATCHED THEN
INSERT (
employee_id,
first_name,
last_name,
email,
phone_number,
hire_date,
job_id,
salary,
department_id,
isActive,
start_date,
end_date
)
VALUES (
staged_updates.employee_id,
staged_updates.first_name,
staged_updates.last_name,
staged_updates.email,
staged_updates.phone_number,
staged_updates.hire_date,
staged_updates.job_id,
staged_updates.salary,
staged_updates.department_id,
1,
current_date(),
'9999-12-31'
)
"""
return spark.sql(mergeQuery1)
Step 6: Create Sample Source Data
Create sample data to replicate the source dataset, which may contain updated records, new records, or both.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime
# Define the schema
schema = StructType([
StructField("employee_id", IntegerType(), nullable=False),
StructField("first_name", StringType(), nullable=True),
StructField("last_name", StringType(), nullable=True),
StructField("email", StringType(), nullable=True),
StructField("phone_number", StringType(), nullable=True),
StructField("hire_date", DateType(), nullable=True),
StructField("job_id", StringType(), nullable=True),
StructField("salary", DoubleType(), nullable=True),
StructField("department_id", IntegerType(), nullable=True),
StructField("isActive", IntegerType(), nullable=True),
StructField("start_date", DateType(), nullable=True),
StructField("end_date", DateType(), nullable=True)
])
# Create a list of tuples with new and updated details
data = [(5, 'David', 'Wilson', 'david.wilson@example.com', '555-6789', datetime.strptime('2018-05-25', '%Y-%m-%d'), 'DEV', 75000.00, 10, None, None, None)]
# Create a Spark DataFrame with the defined schema
source_df = spark.createDataFrame(data, schema=schema)
# Register the DataFrame as a temporary view
source_df.createOrReplaceTempView("vwSource")
Step 7: Call the Function
Call the function to apply the SCD Type 2 logic.
display(scdType2MergeComplete())
Step 8: Verify the Results
Verify that the old record for the updated employee has been marked as inactive and the new record with updated details has been inserted and marked as active.
%sql
select * from dimEmployee
Step 9: Add More Sample Data to Verify Logic
Add more data to verify the logic, including new and updated records in the same source dataset.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime
# Define the schema
schema = StructType([
StructField("employee_id", IntegerType(), nullable=False),
StructField("first_name", StringType(), nullable=True),
StructField("last_name", StringType(), nullable=True),
StructField("email", StringType(), nullable=True),
StructField("phone_number", StringType(), nullable=True),
StructField("hire_date", DateType(), nullable=True),
StructField("job_id", StringType(), nullable=True),
StructField("salary", DoubleType(), nullable=True),
StructField("department_id", IntegerType(), nullable=True),
StructField("isActive", IntegerType(), nullable=True),
StructField("start_date", DateType(), nullable=True),
StructField("end_date", DateType(), nullable=True)
])
# Create a list of tuples with new and updated details
data = [
(1, 'John', 'Doe', 'john.doe@example.com', '555-1234', datetime.strptime('2022-01-15', '%Y-%m-%d'), 'DEV', 65000.00, 10, None, None, None),
(6, 'Junaid', 'Malik', 'malik.junaid@example.com', '555-3465', datetime.strptime('2025-02-19', '%Y-%m-%d'), 'HR', 90000.00, 20, None, None, None)
]
# Create a Spark DataFrame with the defined schema
source_df = spark.createDataFrame(data, schema=schema)
# Register the DataFrame as a temporary view
source_df.createOrReplaceTempView("vwSource")
Step 10: Call the Function Again
Call the function again to execute the merge statement with some more source data.
display(scdType2MergeComplete())
Step 11: Verify the Results
Verify the destination table to confirm that both updates and insertions have been successful. The merge statement effectively handled all scenarios, ensuring atomicity. This means it is safe to execute this function, as no changes will be committed to the target Delta table in case of failure.
%sql
select * from dimEmployee
Conclusion
By following these steps, you can implement SCD Type 2 in Spark SQL using Delta Tables. This approach ensures atomicity and maintains the history of records effectively.
Subscribe to my newsletter
Read articles from Muhammad Atif Hafeez directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
