Use data contracts to automate data workflows - part 2

Preface📖

In part 1, we explained

  • what a data contract is

  • why we need them, and

  • what a typical one contains

In this blog, we dive into a demo to explore how they actually work so we can process data safer, faster and effectively.

Goal🎯

The data science team have approached us. They want this Kaggle dataset uploaded into a Postgres database.

Why?

Because they’ve developed a machine learning model that protects sensitive data belonging to some users, so they need to find out if this model works for Postgres use cases, or whether it needs more work.

Dataset overview 🗂

This dataset currently contains

  • 4434 records

  • 16 fields

Requirements📜

After a few conversations with the team, here’s what they want:

  1. The final output should only include these 5 fields - document, name, email, phone, len

  2. There should be 4434 records in the table

  3. All columns must be a string, expect len field, which must be an integer

  4. No value in the name field can exceed 100 characters

  5. Each value in the document and name fields must be unique

  6. Each value in the len field should be a minimum of 1

Challenges❗

They’ve tried handling these requirements themselves, but sometimes

  • a new field is added or renamed without anyone’s knowledge,

  • an upstream job changes a field’s properties without alerting anyone

  • users enter data that exceed the allowed character limit, but nothing happens

These issues cause downstream jobs to break weekly, and stakeholders often raise support tickets to fix them. These only distract the team from higher-value tasks

Since these problems have persisted for months, some data science team members are losing morale, with some considering moving to new environments where they focus on more meaningful work instead of dealing with boring tech-debt daily.

Ideal outcome🏆

The team want to

  • catch any data issues early in the pipeline

  • know what’s causing these issues

Meeting these basic requirements will mean

  • less time spent on troubleshooting, because they now know the root cause of their problems quicker

  • more opportunities to spend on developing useful features for end-users

  • boost in morale in the data science team, because everyone now gets to work on the projects they were promised

Architectural diagram🗺

Pipeline overview🏭

To show how a data contract works, we’ll create a Python-based data pipeline that processes a CSV file and loads it into a Postgres database

The pipeline is divided into 3 layers

  1. Bronze (raw) layer - captures raw data with minimal validation

  2. Silver (curation) layer - applies strict validation rules and validation to prepare data for users

  3. Gold (user/data access) layer - serves validated and query-ready data to end users and downstream systems

Data contract overview📑

To keep things simple, we’ll use 2 data contracts:

  1. Bronze to Silver - to confirm the raw data entering the curation layer is structured as expected

  2. Silver to Gold - to confirm the transformed data is high-quality and ready for users to query

Pipeline as a DAG🕸

The pipeline is written as an Airflow DAG (Direct Acyclic Graph). This means the tasks are designed to depend on each other.

Here are the tasks for each layer:

  • run_bronze_layer

  • run_silver_layer

  • run_gold_layer

The chain of dependencies looks like this:

So each task depends on the success of its upstream task.

For example,

  • if the Bronze layer task fails, the silver layer task cannot run

  • Also the Gold layer task will not execute if the layer task fails

This means bad data is caught early and doesn’t flow into downstream systems.

Pseudocode🧩

Here’s a breakdown of the steps the data pipeline will take:

  • Ingest CSV data into bronze S3 bucket

  • Check if bronze (raw) data matches the data quality we expect

  • Transform raw data if it passes the 1st data validation checks

  • Check if silver (transformed) data matches the data quality we expect

  • Load into silver S3 bucket if it passes 2nd data validation checks

  • Copy transformed data into gold S3 bucket

  • Upload gold data into Postgres for users to access

Technologies⚙️

Tools

  • Airflow

  • Docker

  • Postgres

  • GitHub

Libraries

  • os

  • boto3

  • python-dotenv

  • json

  • pandas

  • dataclasses

  • cryptography.fernet

  • psycopg2

Folder structure🗃️

│   .env
│   .gitignore
│   docker-compose.yml
│   README.md
│
├───contracts
│       01_B2S_DataContract.json
│       02_S2G_DataContract.json
│
├───dags
│       pii_data_pipeline.py
│
├───data
│   ├───1_bronze
│   │       bronze_sample_dataset.csv
│   │       pii_dataset.csv
│   │
│   ├───2_silver
│   │       silver_sample_dataset.csv
│   │       transformed_pii_dataset.csv
│   │
│   ├───3_gold
│   │       user_access_pii_dataset.csv
│   │
│   └───archive
│           pii_dataset.csv
│
├───init-scripts
└───utils
        aws_utils.py
        data_selector.py
        data_validation.py
        env_utils.py
        generate_keys.py
        postgres_utils.py
        transformer.py
        __init__.py

Data source: Kaggle🌐

The CSV file used in this demo contains personal information (emails, phone numbers etc) about random people.

Note: According to the author, this data is AI generated, so I’m assuming any details used that may be linked to real people are purely coincidental.

You can find the dataset on Kaggle here.

Here’s a little sample of what the data looks like:

1. Bronze layer🥉

This is where the data pipeline begins.

This is the stage where the raw data lands without any transformations applied.

At this stage, we care more about how the data is structured than what it actually contains.

So our goal is to make sure the ingested data

  • has the right number of rows and columns

  • includes all the required columns with the correct data types

  • follows other rules, like some columns should not be empty, others should have unique values etc

We use a data contract to enforce these expectations.

Here’s a simple JSON file that defines the rules the data must meet before we can process it:

{
    "contract_name": "01_BronzeToSilver_DataContract",
    "version": "1.1",
    "schema": {
        "columns": [
            {"name": "document", "type": "string", "constraints": {"not_null": true, "unique": true}},
            {"name": "text", "type": "string", "constraints": {"not_null": true}},
            {"name": "tokens", "type": "string"},
            {"name": "trailing_whitespace", "type": "boolean"},
            {"name": "labels", "type": "string"},
            {"name": "prompt", "type": "string"},
            {"name": "prompt_id", "type": "integer", "constraints": {"not_null": true}},
            {"name": "name", "type": "string", "constraints": {"not_null": true}},
            {"name": "email", "type": "string", "constraints": {"not_null": true}},
            {"name": "phone", "type": "string", "constraints": {"not_null": true}},
            {"name": "job", "type": "string", "constraints": {"not_null": true}},
            {"name": "address", "type": "string", "constraints": {"not_null": true}},
            {"name": "username", "type": "string"},
            {"name": "url", "type": "string"},
            {"name": "hobby", "type": "string"},
            {"name": "len", "type": "integer", "constraints": {"not_null": true}}
        ]
    },
    "validation_rules": {
        "row_count_min": 4434,
        "column_count": 16
    }
}

So this contract says

  • the dataset must have the required columns (e.g. document, email, len)

  • each column must have the correct data type listed (e.g. document must be a string, len must be an integer)

  • the dataset must have 16 columns

So the Bronze layer task will

  • validate the ingested data against the contract

  • upload the data to an S3 bucket if it passes the checks, otherwise the pipeline will break and tell us the reason why

Airflow will automate these steps for us, so here’s the code to create the run_bronze_layer task:

    def run_bronze_layer():
        aws_config, bucket_config, _not_needed = load_env_variables()
        s3_client       =   initialize_s3_client(aws_config)

        BRONZE_BUCKET   =   bucket_config["BRONZE_BUCKET"]
        AWS_REGION      =   aws_config["AWS_REGION"]

        print(f"Checking if bronze bucket '{BRONZE_BUCKET}' exists... ")
        check_if_bucket_exists(s3_client, BRONZE_BUCKET, AWS_REGION)

        # Resolve the full path to the dataset file inside the container
        dataset     =   PIIDataSet.select_dataset(layer="bronze", use_sample=bronze_use_sample_data)
        file_path   =   f"/opt/airflow/{dataset.file_path}"

        # Log the resolved file path
        print(f"Resolved dataset file path: {file_path}")

        # Check if the file exists before uploading
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"[ERROR] - Dataset file not found: {file_path}")

        # Read raw data from bronze S3 bucket
        print(f"Reading raw data from '{file_path}'... ")
        bronze_df = pd.read_csv(file_path)

        # Validate against the B2S data contract
        print(f"Validating bronze data with B2S data contract...")
        BronzeToSilverDataContract = "contracts/01_B2S_DataContract.json"

        validate_data(bronze_df, BronzeToSilverDataContract)
        print(f"Bronze data validation passed successfully ")

        # Upload the dataset file to the Bronze bucket in S3
        upload_file_to_s3(s3_client, file_path, bucket_config["BRONZE_BUCKET"], dataset.file_name)

    run_bronze_layer_task = PythonOperator(
        task_id="run_bronze_layer",
        python_callable=run_bronze_layer,
    )

Test 1: Bronze-to-silver data contract🧪

Let’s check if this process even works.

I navigate to the directory (once I’ve cloned it locally):

git clone <https://github.com/sdw-online/data-contract-driven-pipeline.git>
cd data-contract-driven-pipeline

…then I re-build my Docker containers (if they previously existed):

docker-compose down --volumes --remove-orphans
docker system prune -f
docker volume prune -f
docker-compose up --build

In the pii_data_pipeline.py file, I created a flag that toggles between different tests modes.

Each mode allows you to test any data contract you want:

TESTING_DATA_CONTRACT_MODE      = "Both"

# A. Test the B2S data contract only
if TESTING_DATA_CONTRACT_MODE == "Bronze_To_Silver":
    bronze_use_sample_data = True
    silver_use_sample_data = False

# B. Test the S2G data contract only
elif TESTING_DATA_CONTRACT_MODE == "Silver_To_Gold":
    bronze_use_sample_data = False
    silver_use_sample_data = True

# C. Test both contracts 
elif TESTING_DATA_CONTRACT_MODE == "Both":
    bronze_use_sample_data = False
    silver_use_sample_data = False

else:
    print(f"[ERROR] -- Unable to detect which test this is...: {TESTING_DATA_CONTRACT_MODE}")

We need to test the “Bronze_To_Silver” data contract:

TESTING_DATA_CONTRACT_MODE      = "Bronze_To_Silver"

This tells the pipeline to test the data contract against a sample dataset that contains data issues I’ve deliberately added to it.

Expected results

Here’s what I expect when I run the DAG in Airflow UI:

  • The pipeline creates a bronze S3 bucket

  • The pipeline fails at the bronze layer

  • Data is NOT loaded into Bronze S3 bucket

We want no files loaded, since the bronze data fails to meet every requirement defined in the contract.

Actual results

After running the DAG, here are the results:

1 - an S3 bronze bucket was created

2- the pipeline failed at the bronze task

3 - no data was loaded into the bronze S3 bucket

If I check the log, I can see why the pipeline failed - it’s because the number of rows in the dataset did not meet the expected row count:

Because I know what’s causing the issue, I can now begin taking the right steps in resolving it, whether that means

  • fixing it myself if I know how to,

  • reaching out to data science on next steps, or

  • having discussions on what a longer-term fix looks like

Fixing the issues and re-testing

Once I fixed the data issues upstream, I re-ran the Airflow pipeline to see if the issues were fixed.

1- the bronze stage passed:

2- the CSV file was successfully uploaded into the Bronze S3 bucket:

This confirms we can

  • capture data issues the moment data is ingested

  • prevent bad data from reaching later stages

  • quickly find out what causes the failure and fix it

2. Silver layer 🥈

This is where the raw data gets transformed.

Now we care about how the data is structured and what it contains.

The data needs to be shaped into the requirements the data science team laid out to us, so that means we need to be strict about the expectations we have at this stage.

You can check here if you want to scan through their requirements again.

So the goal here is to make sure

  • the data contains only the 5 columns listed before: document, name, email, phone, len

  • all the columns were strings, except len, which should be an integer

  • each value in the name field is capped to 100 characters max

  • the document and name fields contain only unique values

Here’s the data contract with these requirements:

{
    "contract_name": "02_SilverToGold_DataContract",
    "version": "1.0",
    "schema": {
        "columns": [
            {"name": "document", "type": "string", "constraints": {"not_null": true} },
            {"name": "name", "type": "string", "constraints": {"not_null": true, "max_length": 100 }},
            {"name": "email", "type": "string"},
            {"name": "phone", "type": "string"},
            {"name": "len", "type": "integer", "constraints": {"min_value": 1}}
        ]
    },
    "validation_rules": {
        "row_count_min": 1,
        "column_count": 5
    }
}

In Python, here’s what the silver layer’s task looks like:

    def run_silver_layer():
        aws_config, bucket_config, _not_needed = load_env_variables()
        s3_client       = initialize_s3_client(aws_config)

        SILVER_BUCKET   = bucket_config["SILVER_BUCKET"]
        AWS_REGION      = aws_config["AWS_REGION"]

        print(f"Checking if bucket '{SILVER_BUCKET}' exists... ")
        check_if_bucket_exists(s3_client, SILVER_BUCKET, AWS_REGION)

        # Use the main dataset, or sample one for testing 
        silver_dataset                  = PIIDataSet.select_dataset(layer="silver", use_sample=silver_use_sample_data)
        docker_container_silver_path    = f"/opt/airflow/{silver_dataset.file_path}"

        # Read dataset from silver zone in Docker container 

        if not os.path.exists(docker_container_silver_path):
            raise FileNotFoundError(f"[ERROR] - Unable to find file in Docker container under this filepath: {docker_container_silver_path}")

        silver_df = pd.read_csv(docker_container_silver_path)

        # Transform the raw data
        print(f"Transforming the raw data from bronze to silver ... ")
        silver_df = transform_data(silver_df) 

        # Validate the downloaded Bronze dataset
        print(f"Validating silver data with S2G data contract...")
        SilverToGoldDataContract = "contracts/02_S2G_DataContract.json"

        validate_data(silver_df, SilverToGoldDataContract)
        print(f"Silver data validation passed successfully ")

        # Convert transformed data to CSV
        docker_container_silver_path = f"/opt/airflow/data/2_silver/transformed_pii_dataset.csv"
        silver_df.to_csv(docker_container_silver_path, index=False)

        # Upload the transformed Silver dataset to the Silver bucket in S3
        upload_file_to_s3(s3_client, docker_container_silver_path, SILVER_BUCKET, "transformed_pii_dataset.csv")

    run_silver_layer_task = PythonOperator(
        task_id="run_silver_layer",
        python_callable=run_silver_layer,
    )

Test 2: Silver-to-Gold data contract🧪

Let’s validate the transformed data by switching our test mode to:

TESTING_DATA_CONTRACT_MODE      = "Silver_To_Gold"

This will show us what happens when the Airflow pipeline bumps into data quality issues at the silver layer.

Expected results

So here’s what we expect under this test:

  • the Silver S3 bucket is created

  • Airflow fails at the Silver task

  • Data fails to load into the silver S3 bucket

Actual results

1- the Silver S3 bucket was created successfully:

2 - the Silver task failed in Airflow

3 - no data was loaded into the Silver S3 bucket

Fixing issues and re-testing

The logs reveal several issues, including

  1. Document field was not a string:

  1. Len field not being an integer

  1. Len field contained non-numeric-values

  1. Document field contained duplicate records

After addressing all these issues, I now re-ran the pipeline.

Final results

1- the silver layer task ran successfully

2- the silver (transformed) data is now in the Silver S3 bucket

Using a data contract to validate data after transforming it

  • avoids processing low-quality data by accident

  • saves debugging time

  • resolve issues early

These data quality issues were resolved within minutes of finding them, instead of hours trying to figure out all the things that could be wrong with the data manually.

3. Gold layer🥇

This is where users and tools can access the clean, validated data.

The data science team wants this data in Postgres.

Here’s the Python code to handle that:

    def run_gold_layer():
        aws_config, bucket_config, postgres_config = load_env_variables()
        s3_client       = initialize_s3_client(aws_config)

        GOLD_BUCKET     = bucket_config["GOLD_BUCKET"]
        AWS_REGION      = aws_config["AWS_REGION"]

        # Check if Gold S3 bucket exists
        print(f"Checking if bucket '{GOLD_BUCKET}' exists...")
        check_if_bucket_exists(s3_client, GOLD_BUCKET, AWS_REGION)

        # Download Silver dataset from S3 to local path
        SILVER_BUCKET       = bucket_config["SILVER_BUCKET"]
        silver_file         = "transformed_pii_dataset.csv"
        docker_container_silver_path   = f"/opt/airflow/data/2_silver/{silver_file}"
        silver_df           = download_file_from_s3(
            s3_client, SILVER_BUCKET, silver_file, docker_container_silver_path
        )

        print("Silver dataset successfully loaded into df.")

        # Copy silver data into gold S3 bucket
        gold_file = "user_access_pii_dataset.csv"
        docker_container_gold_path = f"/opt/airflow/data/3_gold/{gold_file}"
        silver_df.to_csv(docker_container_gold_path, index=False)
        upload_file_to_s3(s3_client, docker_container_gold_path, GOLD_BUCKET, gold_file)

        print(f"Transformed data successfully copied to gold S3 bucket '{GOLD_BUCKET}' as '{gold_file}' ")

        gold_df = download_file_from_s3(s3_client, GOLD_BUCKET, gold_file, docker_container_gold_path)

        # Ensure Postgres target objects exist
        print("Ensuring Postgres objects exist...")
        initialize_postgres(postgres_config)

        # Load data into Postgres
        print("Loading data into Postgres...")
        load_data_into_postgres(postgres_config, gold_df)

        # Validate data in Postgres
        print("Validating data in Postgres...")
        validate_postgres_load(
            postgres_config,
            expected_row_count=len(gold_df),
            expected_columns=["document", "name", "email", "phone", "len"],
        )

        print("Gold layer process completed successfully.")

    run_gold_layer_task = PythonOperator(
            task_id="run_gold_layer",
            python_callable=run_gold_layer
        )

Accessing Postgres via psql

We need to list all running Docker containers:

docker ps

Then copy the CONTAINER ID and replace “your-postgres-container_id” below with it:

docker exec -it {your-postgres-container_id} bash

Then run psql with the username (-U) and database (-d) from your docker-compose.yml or .env file:

psql - U {your-postgres-username} -d {your-postgres-password}

Test 3: Data quality checks 🧪

Let’s find out if the data in Postgres matches what the data science team expected:

  1. The final output should only include these 5 fields - document, name, email, phone, len

  2. There should be 4434 records in the table

  3. All columns must be a string, expect len field, which must be an integer

  4. No value in the name field can exceed 100 characters

  5. Each value in the document and name fields must be unique

  6. Each value in the len field should be a minimum of 1

1 - Column name check

There must be 5 columns which include these fields

  • document

  • name

  • email

  • phone

  • len

Query:

SELECT column_name 
FROM information_schema.columns 
WHERE table_schema = 'gold_layer' AND table_name = 'pii_records_tbl';

Result:

 column_name
-------------
 document
 name
 email
 phone
 len
(5 rows)

2 - Row count check

There must be 4434 records

Query:

SELECT COUNT(*) as "PiiRowCount" 
FROM gold_layer.pii_records_tbl;

Results:

 PiiRowCount
-------------
        4434
(1 row)

3 - Data type check

All columns must be a string, expect len field, which must be an integer

Query:

SELECT 
    column_name, 
    data_type 
FROM information_schema.columns 
WHERE table_schema = 'gold_layer' 
    AND table_name = 'pii_records_tbl';

Results:

 column_name | data_type
-------------+-----------
 document    | text
 name        | text
 email       | text
 phone       | text
 len         | integer
(5 rows)

4. Max character count check

No value in the name field can exceed 100 characters

Query:

SELECT name 
FROM gold_layer.pii_records_tbl 
WHERE LENGTH(name) > 100;

Results:

 name
------
(0 rows)

5. Duplicate records check

Each value in the document and name fields must be unique

5.1. For document field:

Query:

SELECT
    'document' AS column_name, 
    document AS value, 
    COUNT(*) AS duplicate_count_for_document_field
FROM gold_layer.pii_records_tbl
GROUP BY document
HAVING COUNT(*) > 1;

Results:

 column_name | value | duplicate_count_for_document_field
-------------+-------+------------------------------------
(0 rows)

5.2. For name field

Query:

SELECT
    'name' AS column_name, 
    document AS value, 
    COUNT(*) AS duplicate_count_for_name_field
FROM gold_layer.pii_records_tbl
GROUP BY document
HAVING COUNT(*) > 1;

Results:

 column_name | value | duplicate_count_for_name_field
-------------+-------+--------------------------------
(0 rows)

6. Min value check

Each value in the len field should be a minimum of 1

Query:

SELECT len 
FROM gold_layer.pii_records_tbl 
WHERE len < 1;

Results:

 len
-----
(0 rows)

Final results🏆

All 6 data quality checks have passed successfully.

This confirms we’ve successfully built a cloud data pipeline that processes high-quality data from source to destination.

Resources 🎁

You can find the full code for this project on my GitHub here

Conclusion🏁

If you’d like a step by step video tutorial on how to build this, subscribe to my YouTube channel to see when the video drops.

Let me know if you want me to build any other data project (engineering, analytics, DataOps, etc), and I’ll be happy to write more blogs and create YouTube videos on these -

I’m here to help😁!

0
Subscribe to my newsletter

Read articles from Stephen David-Williams directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Stephen David-Williams
Stephen David-Williams

I am a developer with 5+ years of data engineering experience in the financial + professional services sector. Feel free to drop a message for any questions! https://medium.com/@sdw-online