ETL from SQL Server to Azure Synapse Analytics with Slowly Changing Dimensions (SCD) Type 2 using Python
- Install the required libraries: You’ll need the
pyodbc
,pandas
, andazure-synapse
libraries. Install them using pip:
pip install pyodbc pandas azure-synapse
2. Connect to SQL Server and extract data:
import pyodbc
import pandas as pd
# Set up your SQL Server connection string
sql_conn_str = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to SQL Server
sql_conn = pyodbc.connect(sql_conn_str)
# Extract data from SQL Server
source_query = "SELECT * FROM your_source_table"
source_data = pd.read_sql(source_query, sql_conn)
# Close the connection
sql_conn.close()
3. Implement SCD Type 2 logic:
# Define the SCD Type 2 logic function
def scd2_merge(current_data, new_data, key_columns, scd_columns):
merged_data = pd.merge(
current_data,
new_data,
on=key_columns,
how="outer",
suffixes=("_current", "_new"),
indicator=True
)
new_records = merged_data[merged_data["_merge"] == "right_only"]
unchanged_records = merged_data[merged_data["_merge"] == "both"]
expired_records = merged_data[merged_data["_merge"] == "left_only"]
for scd_col in scd_columns:
changed_records = unchanged_records[unchanged_records[f"{scd_col}_current"] != unchanged_records[f"{scd_col}_new"]]
unchanged_records = unchanged_records[unchanged_records[f"{scd_col}_current"] == unchanged_records[f"{scd_col}_new"]]
if not changed_records.empty:
changed_records = changed_records.drop("_merge", axis=1)
changed_records["_merge"] = "right_only"
new_records = new_records.append(changed_records)
new_records = new_records.drop("_merge", axis=1)
new_records.columns = [col.replace("_new", "") for col in new_records.columns]
return new_records
4. Connect to Azure Synapse Analytics and load data:
from azure.synapse import SynapseWorkspace
# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)
# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)
# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)
# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)
# Close the connection
synapse_conn.close()
from azure.synapse import SynapseWorkspace
# Set up your Azure Synapse Analytics connection string
synapse_conn_str = "DRIVER={Azure Synapse Analytics};SERVER=your_server;DATABASE=your_database;UID=your_username;PWD=your_password"
# Connect to Azure Synapse Analytics
synapse_conn = pyodbc.connect(synapse_conn_str)
# Load current data from Azure Synapse Analytics
destination_query = "SELECT * FROM your_destination_table"
destination_data = pd.read_sql(destination_query, synapse_conn)
# Apply SCD Type 2 logic and get the new records
key_columns = ["key_column1", "key_column2"]
scd_columns = ["scd_column1", "scd_column2"]
new_records = scd2_merge(destination_data, source_data, key_columns, scd_columns)
# Load new records into Azure Synapse Analytics
new_records.to_sql("your_destination_table", synapse_conn, if_exists="append", index=False)
# Close the connection
synapse_conn.close()
Replace the placeholders (your_server
, your_database
, your_username
, your_password
, your_source_table
, your_destination_table
, key_column
Subscribe to my newsletter
Read articles from Ahmed Sayed directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Ahmed Sayed
Ahmed Sayed
An industrious professional specializing in data engineering for more than 13 years, I am currently leading the data engineers' team with a comprehensive background in various data stack technologies. My prowess extends across Hadoop, SQL, Spark, Python, Airflow, Scala, Java, Talend, SSIS, Data Stage, PDI, Google Cloud, and Azure. Data Streaming is a domain that particularly enthralls me, and I have acquired significant expertise in using Kafka for the same. As a core competency, I specialize in designing and building robust data architectures on diverse platforms, be it on-premises or cloud-based platforms like Azure, Google Cloud Platform, and AWS. Over the years, I've provided numerous solutions in data migration and automation, effortlessly crafting data models and designing warehouses using dimensional models, data vaults, and data marts. A keen advocate for modern lake houses, I have substantial experience in Data Lake designs and implementations. Proficient in a wide array of databases such as SQL Server, PostgreSQL, MySQL, Oracle, Netezza, Cassandra, Mongo, Azure NoSQL, Vantage, Gemin, Neo4j, Arango, Vector, Redshift, Big Query, and many others, I also excel in ensuring data security and compliance across all my projects. My skills extend to Microsoft Azure, where I've built data pipelines, created real-time streaming data solutions, and worked extensively with Azure Synapse, Azure Data Lake, Azure Blob Storage, Azure Databricks, and Azure Stream Analytics. Likewise, I am familiar with AWS EC2, Redshift, Glue, Athene, and GCP Data Flow, Big Query. In addition to this, I have developed API solutions, both RESTful and GraphQL using Python Flask, Django, and Fast API. I've also crafted data solutions for web applications and worked with AI solutions like GPT, Azure Cognitive, and Google NLP, including fine-tuning. Experienced in building and developing custom ML and NLP Models, I am proficient in Process Automation RPA, utilizing tools like UIpath. The intersection of data, technology, and problem-solving fuels my passion, and I am consistently striving to push boundaries and deliver solutions that exceed expectations. My mission is to leverage my data engineering skills and expertise to create innovative and scalable solutions that address the complex challenges and opportunities in many industries. I enjoy collaborating with cross-functional teams of data scientists, product managers, user experience researchers, and software engineers to understand data needs and deliver on them. I am always eager to stay up-to-date with new technologies