Use data contracts to automate data workflows - part 2

Preface📖
In part 1, we explained
what a data contract is
why we need them, and
what a typical one contains
In this blog, we dive into a demo to explore how they actually work so we can process data safer, faster and effectively.
Goal🎯
The data science team have approached us. They want this Kaggle dataset uploaded into a Postgres database.
Why?
Because they’ve developed a machine learning model that protects sensitive data belonging to some users, so they need to find out if this model works for Postgres use cases, or whether it needs more work.
Dataset overview 🗂
This dataset currently contains
4434 records
16 fields
Requirements📜
After a few conversations with the team, here’s what they want:
The final output should only include these 5 fields - document, name, email, phone, len
There should be 4434 records in the table
All columns must be a string, expect len field, which must be an integer
No value in the name field can exceed 100 characters
Each value in the document and name fields must be unique
Each value in the len field should be a minimum of 1
Challenges❗
They’ve tried handling these requirements themselves, but sometimes
a new field is added or renamed without anyone’s knowledge,
an upstream job changes a field’s properties without alerting anyone
users enter data that exceed the allowed character limit, but nothing happens
These issues cause downstream jobs to break weekly, and stakeholders often raise support tickets to fix them. These only distract the team from higher-value tasks
Since these problems have persisted for months, some data science team members are losing morale, with some considering moving to new environments where they focus on more meaningful work instead of dealing with boring tech-debt daily.
Ideal outcome🏆
The team want to
catch any data issues early in the pipeline
know what’s causing these issues
Meeting these basic requirements will mean
less time spent on troubleshooting, because they now know the root cause of their problems quicker
more opportunities to spend on developing useful features for end-users
boost in morale in the data science team, because everyone now gets to work on the projects they were promised
Architectural diagram🗺
Pipeline overview🏭
To show how a data contract works, we’ll create a Python-based data pipeline that processes a CSV file and loads it into a Postgres database
The pipeline is divided into 3 layers
Bronze (raw) layer - captures raw data with minimal validation
Silver (curation) layer - applies strict validation rules and validation to prepare data for users
Gold (user/data access) layer - serves validated and query-ready data to end users and downstream systems
Data contract overview📑
To keep things simple, we’ll use 2 data contracts:
Bronze to Silver - to confirm the raw data entering the curation layer is structured as expected
Silver to Gold - to confirm the transformed data is high-quality and ready for users to query
Pipeline as a DAG🕸
The pipeline is written as an Airflow DAG (Direct Acyclic Graph). This means the tasks are designed to depend on each other.
Here are the tasks for each layer:
run_bronze_layer
run_silver_layer
run_gold_layer
The chain of dependencies looks like this:
So each task depends on the success of its upstream task.
For example,
if the Bronze layer task fails, the silver layer task cannot run
Also the Gold layer task will not execute if the layer task fails
This means bad data is caught early and doesn’t flow into downstream systems.
Pseudocode🧩
Here’s a breakdown of the steps the data pipeline will take:
Ingest CSV data into bronze S3 bucket
Check if bronze (raw) data matches the data quality we expect
Transform raw data if it passes the 1st data validation checks
Check if silver (transformed) data matches the data quality we expect
Load into silver S3 bucket if it passes 2nd data validation checks
Copy transformed data into gold S3 bucket
Upload gold data into Postgres for users to access
Technologies⚙️
Tools
Airflow
Docker
Postgres
GitHub
Libraries
os
boto3
python-dotenv
json
pandas
dataclasses
cryptography.fernet
psycopg2
Folder structure🗃️
│ .env
│ .gitignore
│ docker-compose.yml
│ README.md
│
├───contracts
│ 01_B2S_DataContract.json
│ 02_S2G_DataContract.json
│
├───dags
│ pii_data_pipeline.py
│
├───data
│ ├───1_bronze
│ │ bronze_sample_dataset.csv
│ │ pii_dataset.csv
│ │
│ ├───2_silver
│ │ silver_sample_dataset.csv
│ │ transformed_pii_dataset.csv
│ │
│ ├───3_gold
│ │ user_access_pii_dataset.csv
│ │
│ └───archive
│ pii_dataset.csv
│
├───init-scripts
└───utils
aws_utils.py
data_selector.py
data_validation.py
env_utils.py
generate_keys.py
postgres_utils.py
transformer.py
__init__.py
Data source: Kaggle🌐
The CSV file used in this demo contains personal information (emails, phone numbers etc) about random people.
Note: According to the author, this data is AI generated, so I’m assuming any details used that may be linked to real people are purely coincidental.
You can find the dataset on Kaggle here.
Here’s a little sample of what the data looks like:
1. Bronze layer🥉
This is where the data pipeline begins.
This is the stage where the raw data lands without any transformations applied.
At this stage, we care more about how the data is structured than what it actually contains.
So our goal is to make sure the ingested data
has the right number of rows and columns
includes all the required columns with the correct data types
follows other rules, like some columns should not be empty, others should have unique values etc
We use a data contract to enforce these expectations.
Here’s a simple JSON file that defines the rules the data must meet before we can process it:
{
"contract_name": "01_BronzeToSilver_DataContract",
"version": "1.1",
"schema": {
"columns": [
{"name": "document", "type": "string", "constraints": {"not_null": true, "unique": true}},
{"name": "text", "type": "string", "constraints": {"not_null": true}},
{"name": "tokens", "type": "string"},
{"name": "trailing_whitespace", "type": "boolean"},
{"name": "labels", "type": "string"},
{"name": "prompt", "type": "string"},
{"name": "prompt_id", "type": "integer", "constraints": {"not_null": true}},
{"name": "name", "type": "string", "constraints": {"not_null": true}},
{"name": "email", "type": "string", "constraints": {"not_null": true}},
{"name": "phone", "type": "string", "constraints": {"not_null": true}},
{"name": "job", "type": "string", "constraints": {"not_null": true}},
{"name": "address", "type": "string", "constraints": {"not_null": true}},
{"name": "username", "type": "string"},
{"name": "url", "type": "string"},
{"name": "hobby", "type": "string"},
{"name": "len", "type": "integer", "constraints": {"not_null": true}}
]
},
"validation_rules": {
"row_count_min": 4434,
"column_count": 16
}
}
So this contract says
the dataset must have the required columns (e.g. document, email, len)
each column must have the correct data type listed (e.g. document must be a string, len must be an integer)
the dataset must have 16 columns
So the Bronze layer task will
validate the ingested data against the contract
upload the data to an S3 bucket if it passes the checks, otherwise the pipeline will break and tell us the reason why
Airflow will automate these steps for us, so here’s the code to create the run_bronze_layer task:
def run_bronze_layer():
aws_config, bucket_config, _not_needed = load_env_variables()
s3_client = initialize_s3_client(aws_config)
BRONZE_BUCKET = bucket_config["BRONZE_BUCKET"]
AWS_REGION = aws_config["AWS_REGION"]
print(f"Checking if bronze bucket '{BRONZE_BUCKET}' exists... ")
check_if_bucket_exists(s3_client, BRONZE_BUCKET, AWS_REGION)
# Resolve the full path to the dataset file inside the container
dataset = PIIDataSet.select_dataset(layer="bronze", use_sample=bronze_use_sample_data)
file_path = f"/opt/airflow/{dataset.file_path}"
# Log the resolved file path
print(f"Resolved dataset file path: {file_path}")
# Check if the file exists before uploading
if not os.path.exists(file_path):
raise FileNotFoundError(f"[ERROR] - Dataset file not found: {file_path}")
# Read raw data from bronze S3 bucket
print(f"Reading raw data from '{file_path}'... ")
bronze_df = pd.read_csv(file_path)
# Validate against the B2S data contract
print(f"Validating bronze data with B2S data contract...")
BronzeToSilverDataContract = "contracts/01_B2S_DataContract.json"
validate_data(bronze_df, BronzeToSilverDataContract)
print(f"Bronze data validation passed successfully ")
# Upload the dataset file to the Bronze bucket in S3
upload_file_to_s3(s3_client, file_path, bucket_config["BRONZE_BUCKET"], dataset.file_name)
run_bronze_layer_task = PythonOperator(
task_id="run_bronze_layer",
python_callable=run_bronze_layer,
)
Test 1: Bronze-to-silver data contract🧪
Let’s check if this process even works.
I navigate to the directory (once I’ve cloned it locally):
git clone <https://github.com/sdw-online/data-contract-driven-pipeline.git>
cd data-contract-driven-pipeline
…then I re-build my Docker containers (if they previously existed):
docker-compose down --volumes --remove-orphans
docker system prune -f
docker volume prune -f
docker-compose up --build
In the pii_data_pipeline.py file, I created a flag that toggles between different tests modes.
Each mode allows you to test any data contract you want:
TESTING_DATA_CONTRACT_MODE = "Both"
# A. Test the B2S data contract only
if TESTING_DATA_CONTRACT_MODE == "Bronze_To_Silver":
bronze_use_sample_data = True
silver_use_sample_data = False
# B. Test the S2G data contract only
elif TESTING_DATA_CONTRACT_MODE == "Silver_To_Gold":
bronze_use_sample_data = False
silver_use_sample_data = True
# C. Test both contracts
elif TESTING_DATA_CONTRACT_MODE == "Both":
bronze_use_sample_data = False
silver_use_sample_data = False
else:
print(f"[ERROR] -- Unable to detect which test this is...: {TESTING_DATA_CONTRACT_MODE}")
We need to test the “Bronze_To_Silver” data contract:
TESTING_DATA_CONTRACT_MODE = "Bronze_To_Silver"
This tells the pipeline to test the data contract against a sample dataset that contains data issues I’ve deliberately added to it.
Expected results
Here’s what I expect when I run the DAG in Airflow UI:
The pipeline creates a bronze S3 bucket
The pipeline fails at the bronze layer
Data is NOT loaded into Bronze S3 bucket
We want no files loaded, since the bronze data fails to meet every requirement defined in the contract.
Actual results
After running the DAG, here are the results:
1 - an S3 bronze bucket was created
2- the pipeline failed at the bronze task
3 - no data was loaded into the bronze S3 bucket
If I check the log, I can see why the pipeline failed - it’s because the number of rows in the dataset did not meet the expected row count:
Because I know what’s causing the issue, I can now begin taking the right steps in resolving it, whether that means
fixing it myself if I know how to,
reaching out to data science on next steps, or
having discussions on what a longer-term fix looks like
Fixing the issues and re-testing
Once I fixed the data issues upstream, I re-ran the Airflow pipeline to see if the issues were fixed.
1- the bronze stage passed:
2- the CSV file was successfully uploaded into the Bronze S3 bucket:
This confirms we can
capture data issues the moment data is ingested
prevent bad data from reaching later stages
quickly find out what causes the failure and fix it
2. Silver layer 🥈
This is where the raw data gets transformed.
Now we care about how the data is structured and what it contains.
The data needs to be shaped into the requirements the data science team laid out to us, so that means we need to be strict about the expectations we have at this stage.
You can check here if you want to scan through their requirements again.
So the goal here is to make sure
the data contains only the 5 columns listed before: document, name, email, phone, len
all the columns were strings, except len, which should be an integer
each value in the name field is capped to 100 characters max
the document and name fields contain only unique values
Here’s the data contract with these requirements:
{
"contract_name": "02_SilverToGold_DataContract",
"version": "1.0",
"schema": {
"columns": [
{"name": "document", "type": "string", "constraints": {"not_null": true} },
{"name": "name", "type": "string", "constraints": {"not_null": true, "max_length": 100 }},
{"name": "email", "type": "string"},
{"name": "phone", "type": "string"},
{"name": "len", "type": "integer", "constraints": {"min_value": 1}}
]
},
"validation_rules": {
"row_count_min": 1,
"column_count": 5
}
}
In Python, here’s what the silver layer’s task looks like:
def run_silver_layer():
aws_config, bucket_config, _not_needed = load_env_variables()
s3_client = initialize_s3_client(aws_config)
SILVER_BUCKET = bucket_config["SILVER_BUCKET"]
AWS_REGION = aws_config["AWS_REGION"]
print(f"Checking if bucket '{SILVER_BUCKET}' exists... ")
check_if_bucket_exists(s3_client, SILVER_BUCKET, AWS_REGION)
# Use the main dataset, or sample one for testing
silver_dataset = PIIDataSet.select_dataset(layer="silver", use_sample=silver_use_sample_data)
docker_container_silver_path = f"/opt/airflow/{silver_dataset.file_path}"
# Read dataset from silver zone in Docker container
if not os.path.exists(docker_container_silver_path):
raise FileNotFoundError(f"[ERROR] - Unable to find file in Docker container under this filepath: {docker_container_silver_path}")
silver_df = pd.read_csv(docker_container_silver_path)
# Transform the raw data
print(f"Transforming the raw data from bronze to silver ... ")
silver_df = transform_data(silver_df)
# Validate the downloaded Bronze dataset
print(f"Validating silver data with S2G data contract...")
SilverToGoldDataContract = "contracts/02_S2G_DataContract.json"
validate_data(silver_df, SilverToGoldDataContract)
print(f"Silver data validation passed successfully ")
# Convert transformed data to CSV
docker_container_silver_path = f"/opt/airflow/data/2_silver/transformed_pii_dataset.csv"
silver_df.to_csv(docker_container_silver_path, index=False)
# Upload the transformed Silver dataset to the Silver bucket in S3
upload_file_to_s3(s3_client, docker_container_silver_path, SILVER_BUCKET, "transformed_pii_dataset.csv")
run_silver_layer_task = PythonOperator(
task_id="run_silver_layer",
python_callable=run_silver_layer,
)
Test 2: Silver-to-Gold data contract🧪
Let’s validate the transformed data by switching our test mode to:
TESTING_DATA_CONTRACT_MODE = "Silver_To_Gold"
This will show us what happens when the Airflow pipeline bumps into data quality issues at the silver layer.
Expected results
So here’s what we expect under this test:
the Silver S3 bucket is created
Airflow fails at the Silver task
Data fails to load into the silver S3 bucket
Actual results
1- the Silver S3 bucket was created successfully:
2 - the Silver task failed in Airflow
3 - no data was loaded into the Silver S3 bucket
Fixing issues and re-testing
The logs reveal several issues, including
- Document field was not a string:
- Len field not being an integer
- Len field contained non-numeric-values
- Document field contained duplicate records
After addressing all these issues, I now re-ran the pipeline.
Final results
1- the silver layer task ran successfully
2- the silver (transformed) data is now in the Silver S3 bucket
Using a data contract to validate data after transforming it
avoids processing low-quality data by accident
saves debugging time
resolve issues early
These data quality issues were resolved within minutes of finding them, instead of hours trying to figure out all the things that could be wrong with the data manually.
3. Gold layer🥇
This is where users and tools can access the clean, validated data.
The data science team wants this data in Postgres.
Here’s the Python code to handle that:
def run_gold_layer():
aws_config, bucket_config, postgres_config = load_env_variables()
s3_client = initialize_s3_client(aws_config)
GOLD_BUCKET = bucket_config["GOLD_BUCKET"]
AWS_REGION = aws_config["AWS_REGION"]
# Check if Gold S3 bucket exists
print(f"Checking if bucket '{GOLD_BUCKET}' exists...")
check_if_bucket_exists(s3_client, GOLD_BUCKET, AWS_REGION)
# Download Silver dataset from S3 to local path
SILVER_BUCKET = bucket_config["SILVER_BUCKET"]
silver_file = "transformed_pii_dataset.csv"
docker_container_silver_path = f"/opt/airflow/data/2_silver/{silver_file}"
silver_df = download_file_from_s3(
s3_client, SILVER_BUCKET, silver_file, docker_container_silver_path
)
print("Silver dataset successfully loaded into df.")
# Copy silver data into gold S3 bucket
gold_file = "user_access_pii_dataset.csv"
docker_container_gold_path = f"/opt/airflow/data/3_gold/{gold_file}"
silver_df.to_csv(docker_container_gold_path, index=False)
upload_file_to_s3(s3_client, docker_container_gold_path, GOLD_BUCKET, gold_file)
print(f"Transformed data successfully copied to gold S3 bucket '{GOLD_BUCKET}' as '{gold_file}' ")
gold_df = download_file_from_s3(s3_client, GOLD_BUCKET, gold_file, docker_container_gold_path)
# Ensure Postgres target objects exist
print("Ensuring Postgres objects exist...")
initialize_postgres(postgres_config)
# Load data into Postgres
print("Loading data into Postgres...")
load_data_into_postgres(postgres_config, gold_df)
# Validate data in Postgres
print("Validating data in Postgres...")
validate_postgres_load(
postgres_config,
expected_row_count=len(gold_df),
expected_columns=["document", "name", "email", "phone", "len"],
)
print("Gold layer process completed successfully.")
run_gold_layer_task = PythonOperator(
task_id="run_gold_layer",
python_callable=run_gold_layer
)
Accessing Postgres via psql
We need to list all running Docker containers:
docker ps
Then copy the CONTAINER ID and replace “your-postgres-container_id” below with it:
docker exec -it {your-postgres-container_id} bash
Then run psql with the username (-U) and database (-d) from your docker-compose.yml or .env file:
psql - U {your-postgres-username} -d {your-postgres-password}
Test 3: Data quality checks 🧪
Let’s find out if the data in Postgres matches what the data science team expected:
The final output should only include these 5 fields - document, name, email, phone, len
There should be 4434 records in the table
All columns must be a string, expect len field, which must be an integer
No value in the name field can exceed 100 characters
Each value in the document and name fields must be unique
Each value in the len field should be a minimum of 1
1 - Column name check
There must be 5 columns which include these fields
document
name
email
phone
len
Query:
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'gold_layer' AND table_name = 'pii_records_tbl';
Result:
column_name
-------------
document
name
email
phone
len
(5 rows)
2 - Row count check
There must be 4434 records
Query:
SELECT COUNT(*) as "PiiRowCount"
FROM gold_layer.pii_records_tbl;
Results:
PiiRowCount
-------------
4434
(1 row)
3 - Data type check
All columns must be a string, expect len field, which must be an integer
Query:
SELECT
column_name,
data_type
FROM information_schema.columns
WHERE table_schema = 'gold_layer'
AND table_name = 'pii_records_tbl';
Results:
column_name | data_type
-------------+-----------
document | text
name | text
email | text
phone | text
len | integer
(5 rows)
4. Max character count check
No value in the name field can exceed 100 characters
Query:
SELECT name
FROM gold_layer.pii_records_tbl
WHERE LENGTH(name) > 100;
Results:
name
------
(0 rows)
5. Duplicate records check
Each value in the document and name fields must be unique
5.1. For document field:
Query:
SELECT
'document' AS column_name,
document AS value,
COUNT(*) AS duplicate_count_for_document_field
FROM gold_layer.pii_records_tbl
GROUP BY document
HAVING COUNT(*) > 1;
Results:
column_name | value | duplicate_count_for_document_field
-------------+-------+------------------------------------
(0 rows)
5.2. For name field
Query:
SELECT
'name' AS column_name,
document AS value,
COUNT(*) AS duplicate_count_for_name_field
FROM gold_layer.pii_records_tbl
GROUP BY document
HAVING COUNT(*) > 1;
Results:
column_name | value | duplicate_count_for_name_field
-------------+-------+--------------------------------
(0 rows)
6. Min value check
Each value in the len field should be a minimum of 1
Query:
SELECT len
FROM gold_layer.pii_records_tbl
WHERE len < 1;
Results:
len
-----
(0 rows)
Final results🏆
All 6 data quality checks have passed successfully.
This confirms we’ve successfully built a cloud data pipeline that processes high-quality data from source to destination.
Resources 🎁
You can find the full code for this project on my GitHub here
Conclusion🏁
If you’d like a step by step video tutorial on how to build this, subscribe to my YouTube channel to see when the video drops.
Let me know if you want me to build any other data project (engineering, analytics, DataOps, etc), and I’ll be happy to write more blogs and create YouTube videos on these -
I’m here to help😁!
Subscribe to my newsletter
Read articles from Stephen David-Williams directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Stephen David-Williams
Stephen David-Williams
I am a developer with 5+ years of data engineering experience in the financial + professional services sector. Feel free to drop a message for any questions! https://medium.com/@sdw-online