dbt Data Quality Audit, But on Steroid

Ahmad MuhammadAhmad Muhammad
15 min read

Data quality auditing is a process of ensuring that data is fit for a given purpose. Poor data quality results in wrong business decisions and might decline corporate performance.

some of these metrics are:

MetricDescriptionFormula
CompletenessPercentage of complete (non-null) values in a column.number of non-null values / total rows
ConsistencyPercentage of values that follow a defined format (e.g., date, phone number).number of consistent values / total rows
Data TimelinessDescribes how up-to-date or fresh the data is relative to the current time.Context-dependent; may use timestamp difference
UniquenessDescribes how many values are unique (non-duplicate) in a column.number of distinct values / total rows

The Problem With dbt tests

In dbt, you get some out-of-the-box data tests, in addition to user-defined tests on data, and you get results along with severity by setting its behavior to error or warn and threshold.

However, this is not enough for a good auditing strategy, as we don't have the ability to store these failures historically, even when using --store-failures.

A test's results will always replace previous failures for the same test. according to dbt docs

But, Why to store errors historically at the first place?

  • Trend Analysis: Historical failures help identify persistent issues and measure improvement over time.

  • Root Cause Tracking: Recurring errors often stem from upstream source systems or processes; historical data enables RCA.

  • Audit & Compliance: In regulated environments, it's essential to track and prove how data quality evolved.

  • Business Impact Assessment: Helps correlate poor data quality with operational or financial consequences.

Without historical data audit, you are flying blindly!

πŸ€—
The upcomming assumes familiarity with concepts such as data quality, dbt tests, and dbt macros.

Proposed Solution

  • Additional work is needed to move stored-failures data to a secure location and mark each batch with a timestamp (hence the term "historical").

  • Calculate some KPIs (Key Performance Indicators) based on this data, with detailed information at the column level.

Here, I will define a simple core table schema for the proposed table that will store our test data.

Field NameData TypeDescription
batch_idTEXTBatch unique identifier
model_idTEXTThe tested model
column_nameTEXTThe tested column
kpi_nameTEXTType of test was performed on this table
failed_rowsINTNumber of rows which failed the test
total_rowsINTNumber of total rows (including nulls)
success_pctDECIMAL(5, 2)Passed_rows * 100 / total_rows
failure_pctDECIMAL(5, 2)Failed_rows * 100 / total_rows
created_atTIMESTAMPThe timestamp at which stored failures moved to this table

A good start, but it needs additional fields, such as a failed sample and the query that loaded this data! Let's add these extra fields to our table.

Field NameData TypeDescription
failed_sampleJSON100 rows Sample of the failed population.
query_textTEXT[]The query used to insert data into this table.

Despite failure tables being expected to follow the naming convention <test_name>_<model_name>_<column_name>, you might encounter names like audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd, which makes extracting this data challenging.

Fortunately, there is a way to extract these information from graph context directly inside dbt.

The graph context variable is a dictionary which maps node ids onto dictionary representations of those nodes. - dbt docs

πŸ₯΄
Clone this repository for the demo code.

Some built-in tests in dbt don't return failed rows, so new tests need to be implemented. Note that every test starts with audit__, as this naming convention will be important later!

-- some test examples from my demo

-- ./macros/audit/consistency.sql
{% test audit__consistency(model, column_name, regex, case_sensitive=false) %}
    SELECT * FROM {{ model }}

    {% if case_sensitive %}
        {% set operator = '!~' %}
    {% else %}
        {% set operator = '!~*' %}
    {% endif %}

    WHERE {{ column_name }} {{operator}} '{{ regex }}'

{% endtest %}


-- ./macros/audit/greater_than_zero.sql
{% test audit__greater_than_zero(model, column_name) %}
    SELECT * FROM {{ model }}
    WHERE {{ column_name }} < 0
{% endtest %}



-- ./macros/audit/not_null.sql
{% test audit__not_null(model, column_name) %}
    SELECT * FROM {{ model }}
    WHERE {{ column_name }} IS NULL
{% endtest %}



-- ./macros/audit/unique.sql
{% test audit__unique(model, column_name) %}
    with duplicates as (
        select {{ column_name }}
        from {{ model }}
        group by {{ column_name }}
        having count(*) > 1
    )

    select *
    from {{ model }}
    where {{ column_name }} in (select {{ column_name }} from duplicates)
{% endtest %}

Exploring Graph Context

To access the graph context in dbt, all you need to do is call {{ graph }}. That's it! But what does it contain?

Let's begin by exploring the graph's top-level keys.

dbt compile --inline "{{ graph.keys() }}"        
17:00:27  Running with dbt=1.9.6
17:00:27  Registered adapter: postgres=1.9.0
17:00:28  Found 9 models, 3 seeds, 6 data tests, 1 sql operation, 440 macros
17:00:28  
17:00:28  Concurrency: 1 threads (target='dev')
17:00:28  
Compiled inline node is:
dict_keys(['exposures', 'groups', 'metrics', 'nodes', 'sources', 'semantic_models', 'saved_queries'])

Now, Let’s get a level deeper and check nodes keys, and surprise! We got all of our models, seeds, and tests

dbt compile --inline "{{ graph.nodes.keys() }}"
17:04:28  Running with dbt=1.9.6
17:04:29  Registered adapter: postgres=1.9.0
17:04:29  Found 9 models, 3 seeds, 6 data tests, 1 sql operation, 440 macros
17:04:29  
17:04:29  Concurrency: 1 threads (target='dev')
17:04:29  
Compiled inline node is:
dict_keys(['model.audit_on_steroid.stg_orders', 'model.audit_on_steroid.stg_products', 'model.audit_on_steroid.stg_customers', 'model.audit_on_steroid.int_products', 'model.audit_on_steroid.int_order_customer_joined', 'model.audit_on_steroid.int_customers', 'model.audit_on_steroid.int_orders', 'model.audit_on_steroid.mart_order_summary', 'seed.audit_on_steroid.raw_orders', 'seed.audit_on_steroid.raw_products', 'seed.audit_on_steroid.raw_customers', 'model.audit_on_steroid.mart_order_summary_wap', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_customer_id.7adbd28cc6', 'test.audit_on_steroid.audit__unique_mart_order_summary_wap_customer_id.bb8cd4e984', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_total_spent.d0468db7f7', 'test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_first_order.22b1d319a6', 'test.audit_on_steroid.audit__consistency_mart_order_summary_wap_email___A_Za_z0_9___A_Za_z0_9_A_Za_z_.5e07113309'])

Let’s pickup a test and see what it includes:

dbt compile --inline "{{ graph.nodes.get('test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0') }}"
{
    "database": "data_warehouse",
    "schema": "prefex__dbt_test__audit",
    "name": "audit__greater_than_zero_mart_order_summary_wap_total_spent",
    "resource_type": "test",
    "package_name": "audit_on_steroid",
    "path": "audit__greater_than_zero_mart_order_summary_wap_total_spent.sql",
    "original_file_path": "models/mart/_mart_order_summary.yml",
    "unique_id": "test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0",
    "fqn": [
        "audit_on_steroid",
        "mart",
        "audit__greater_than_zero_mart_order_summary_wap_total_spent",
    ],
    "alias": "audit__greater_than_zero_mart_order_summary_wap_total_spent",
    "checksum": {"name": "none", "checksum": ""},
    "config": {
        "enabled": True,
        "alias": None,
        "schema": "dbt_test__audit",
        "database": None,
        "tags": [],
        "meta": {},
        "group": None,
        "materialized": "test",
        "severity": "warn",
        "store_failures": True,
        "store_failures_as": "table",
        "where": None,
        "limit": None,
        "fail_calc": "count(*)",
        "warn_if": "!= 0",
        "error_if": "!= 0",
    },
    "tags": [],
    "description": "",
    "columns": {},
    "meta": {},
    "group": None,
    "docs": {"show": True, "node_color": None},
    "patch_path": None,
    "build_path": None,
    "unrendered_config": {"store_failures": True, "severity": "warn"},
    "created_at": 1749301264.3145826,
    "config_call_dict": {},
    "unrendered_config_call_dict": {},
    "relation_name": '"data_warehouse"."prefex__dbt_test__audit"."audit__greater_than_zero_mart_order_summary_wap_total_spent"',
    "raw_code": "{{ test_audit__greater_than_zero(**_dbt_generic_test_kwargs) }}",
    "language": "sql",
    "refs": [{"name": "mart_order_summary_wap", "package": None, "version": None}],
    "sources": [],
    "metrics": [],
    "depends_on": {
        "macros": [
            "macro.audit_on_steroid.test_audit__greater_than_zero",
            "macro.dbt.get_where_subquery",
        ],
        "nodes": ["model.audit_on_steroid.mart_order_summary_wap"],
    },
    "compiled_path": None,
    "contract": {"enforced": False, "alias_types": True, "checksum": None},
    "column_name": "total_spent",
    "file_key_name": "models.mart_order_summary_wap",
    "attached_node": "model.audit_on_steroid.mart_order_summary_wap",
    "test_metadata": {
        "name": "audit__greater_than_zero",
        "kwargs": {
            "column_name": "total_spent",
            "model": "{{ get_where_subquery(ref('mart_order_summary_wap')) }}",
        },
        "namespace": None,
    },
}

That’s awesome! you have the access to all related metadata to any dbt resource.

But in our case, the interest is drifted to this section of the previous dictionary

by extracting tests from graph , it’s possible to get the model which it tests in addition to the column it tests.

    "column_name": "total_spent",
    "file_key_name": "models.mart_order_summary_wap",
    "attached_node": "model.audit_on_steroid.mart_order_summary_wap",

Utilize Graph Context in Data Quality Auditing Automation.

Collect Test Nodes

Since every node has resource_type property, it’s possible to filter nodes based on type, then extract needed metadata from it.

{% macro __get_test_metadata() %}
    {% set test_metadata = [] %}
    {% if execute %}
        {% for test_node in graph.nodes.values()
            | selectattr("resource_type", "equalto", "test") %}
            {% set model_node = graph.nodes.get(test_node.attached_node) %}
            {% if model_node and
                test_node.test_metadata.name and
                test_node.test_metadata.name.startswith('audit_') %}
                {% do test_metadata.append({
                'test_name': test_node.test_metadata.name,
                'column_name': test_node.column_name | default(''),
                'test_relation': test_node.relation_name,
                'model_relation': model_node.relation_name,
                'model_id': model_node.unique_id
                }) %}
            {% endif %}
        {% endfor %}
    {% endif %}
    {{ return(test_metadata) }}
{% endmacro %}

Let’s compile __get_test_metadata() macro to see it’s result

dbt compile --inline "{{ __get_test_metadata() }}"
[
    {
        "test_name": "audit__not_null",
        "column_name": "customer_id",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_customer_id"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
    {
        "test_name": "audit__unique",
        "column_name": "customer_id",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__unique_mart_order_summary_wap_customer_id"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
    {
        "test_name": "audit__not_null",
        "column_name": "total_spent",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
    {
        "test_name": "audit__greater_than_zero",
        "column_name": "total_spent",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__greater_than_zero_mart_order_summary_wap_total_spent"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
    {
        "test_name": "audit__not_null",
        "column_name": "first_order",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
    {
        "test_name": "audit__consistency",
        "column_name": "email",
        "test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd"',
        "model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
        "model_id": "model.audit_on_steroid.mart_order_summary_wap",
    },
]

Constructing The Audit Table

In the previous array of dictionaries, each element can represent a select statement that will be inserted into the audit table.

🀯
A batch ID can be represented as a string that uniquely identifies a batch. This could include formats such as timestamps, integer UUIDs, or dbt's invocation_id, among others.
πŸ’‘
sql_text field value will be injected after constructing the query in jinja micro
SELECT 
    '{{ batch_id }}'::TEXT as batch_id,
    '{{ test.model_id }}'::TEXT as model_id,
    '{{ test.model_relation }}'::TEXT as relation_name,
    '{{ test.column_name }}'::TEXT as column_name,
    '{{ test.test_name }}'::TEXT as kpi_name,
    ft.failed_count::INT as failed_rows,
    tt.total_count::INT as total_rows,

    CASE 
        WHEN tt.total_count = 0 THEN 0 
        ELSE ROUND((ft.failed_count * 100.0) / tt.total_count, 2)
    END as failure_pct,

    CASE 
        WHEN tt.total_count = 0 THEN 100 
        ELSE ROUND(100 - (ft.failed_count * 100.0) / tt.total_count, 2)
    END as success_pct,

    fs.failed_sample,

    CURRENT_TIMESTAMP as created_at
FROM 
    -- Failed rows count for this specific batch
    (
        SELECT COUNT(*) as failed_count
        FROM {{ test.test_relation }}
        WHERE {{ batch_id_column }} = '{{ batch_id }}'
    ) ft,

    -- Total rows count for this batch from the model
    (
        SELECT COUNT(*) as total_count 
        FROM {{ test.model_relation }}
        WHERE {{ batch_id_column }} = '{{ batch_id }}'
    ) tt,

    -- Sample of failed records for this batch
    (
        SELECT JSON_AGG(sample_data) as failed_sample
        FROM (
            SELECT row_to_json(t) AS sample_data
            FROM {{ test.test_relation }} t
            WHERE t.{{ batch_id_column }} = '{{ batch_id }}'
            LIMIT 100
        ) sample
    ) AS fs

The next step is to extract batches based on a given column that could be a batch identifier.

Test results will always be replaced after running the test, but:

  • If batch could pass the test, the stored failures table will not be emptied or replaced.

  • So, to avoid duplicate records in our historical audit table, it's crucial to verify if a given batch_id for a specific test and model has already been recorded.

SELECT DISTINCT 
    tr.{{ batch_id_column }}
FROM 
    {{ test.test_relation }} tr -- where stored failures occure
WHERE NOT EXISTS (
    SELECT 1
    FROM {{ audit_schema }}.{{ audit_table_name }} ar
    WHERE ar.batch_id = tr.{{ batch_id_column }}::TEXT
        AND ar.model_id = '{{ test.model_id }}'
        AND ar.kpi_name = '{{ test.test_name }}'
        AND ar.column_name = '{{ test.column_name }}'
)
ORDER BY tr.{{ batch_id_column }}

The workflow will look similar to this flowchart.

flowchart TD     
A[Start macro: audit__post_hook] --> B[Create audit schema and table if not exists]     
B --> C[Call macro: __get_test_metadata]     
C --> D[Loop through each test in metadata]     
D --> E{test_relation & model_relation exist?}     
E -- No --> D     
E -- Yes --> F[Run query to find unprocessed batch_ids]     
F --> G[Log found batch_ids]      
G --> H[Get first and only batch_id]     
H --> I[Build SQL to calculate KPIs and failed sample]     
I --> J[Wrap SQL with query_text and add to audit_select_queries]                          
J --> M[Construct INSERT INTO audit_report]     
M --> L{More tests}
L -- yes --> D
L -- no --> N[Run audit_insert_query]     
N --> O[Log insert query and result status]     
O --> P[End]

Demo

Setup The Project

docker compose up -d # run the postgres container
cd audit_on_steroid
dbt seed --profiles-dir . # this loads sample data into the db

Exploring The Demo

In our models directory, you'll find the three familiar layers: stg, int, and mart. Our goal is to use the Write Audit Publish (WAP) pattern. Check out the model mart_order_summary_wap.sql, which will store our audited data before it's published to production.

❯ tree ./models                                                 
./models
β”œβ”€β”€ int
β”‚   β”œβ”€β”€ int_customers.sql
β”‚   β”œβ”€β”€ int_order_customer_joined.sql
β”‚   β”œβ”€β”€ int_orders.sql
β”‚   └── int_products.sql
β”œβ”€β”€ mart
β”‚   β”œβ”€β”€ mart_order_summary.sql
β”‚   β”œβ”€β”€ mart_order_summary_wap.sql
β”‚   └── _mart_order_summary.yml
└── stg
    β”œβ”€β”€ stg_customers.sql
    β”œβ”€β”€ stg_orders.sql
    └── stg_products.sql

3 directories, 10 files

Let’s check how _mart_order_summary.yml was written. Remember the goal of using custom generic tests is to return the entire failed row instead of failures summary.

version: 2

models:
  - name: mart_order_summary_wap
    config:
      tags: ['wap'] # Notice the tag here!
    description: "Customer-level order aggregation WAP table"
    columns:
      - name: customer_id
        data_tests: [audit__not_null, audit__unique]

      - name: total_spent
        data_tests: [audit__not_null, audit__greater_than_zero]

      - name: first_order
        data_tests:
          - audit__not_null

      - name: email
        data_tests: 
          - audit__consistency:
              regex: "^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$"

Now, it’s time to run the workflow!

dbt run -s +tag:wap && \ # run wap models upstream -including wap model-
dbt test -s tag:wap && \ # run tests step
dbt run-operation audit__post_hook \ # move test results to our audit table!
--args '{audit_schema: "audit__reports", audit_table_name: "audit_report", batch_id_column: "created_at"}' && \
dbt run -s tag:wap+ --exclude tag:wap # publish data to prod table

This table is what should be expected in the mart_order_summary_wap table.

customer_idcustomer_nameemailtotal_orderstotal_spentfirst_orderlast_ordercreated_at
101aliinvalid_email.com12024-06-012024-06-012025-06-08 16:47:13.171396+00
102sarasara@example.com12025-06-08 16:47:13.171396+00
103aliali@example.com12024-06-032024-06-032025-06-08 16:47:13.171396+00
10412025-06-08 16:47:13.171396+00

In this demo, the severity was set to 'warn' for demonstration purposes, ensuring that it does not break the pipeline. You can experiment with it to establish data quality rules and observe the results. Let’s check tables created by the run:

❯ docker ps 
CONTAINER ID   IMAGE      COMMAND                  CREATED        STATUS       PORTS                                         NAMES
d49fd5fbe329   postgres   "docker-entrypoint.s…"   27 hours ago   Up 5 hours   0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp   audit_on_steroid-db-1

❯ docker exec -it audit_on_steroid-db-1 psql -U postgres -d data_warehouse   
psql (17.5 (Debian 17.5-1.pgdg120+1))
Type "help" for help.

data_warehouse=# SELECT * FROM pg_catalog.pg_tables where tablename not like 'pg_%';
       schemaname        |                            tablename                            | tableowner | tablespace | hasindexes | hasrules | hastriggers | rowsecurity 
-------------------------+-----------------------------------------------------------------+------------+------------+------------+----------+-------------+-------------
 prefex__stg             | stg_orders                                                      | postgres   |            | f          | f        | f           | f
 prefex_                 | raw_customers                                                   | postgres   |            | f          | f        | f           | f
 prefex__mart            | mart_order_summary_wap                                          | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_first_order              | postgres   |            | f          | f        | f           | f
 prefex_                 | raw_orders                                                      | postgres   |            | f          | f        | f           | f
 prefex__int             | int_customers                                                   | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_total_spent              | postgres   |            | f          | f        | f           | f
 prefex_                 | raw_products                                                    | postgres   |            | f          | f        | f           | f
 prefex__int             | int_orders                                                      | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__greater_than_zero_mart_order_summary_wap_total_spent     | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__unique_mart_order_summary_wap_customer_id                | postgres   |            | f          | f        | f           | f
 prefex__mart            | mart_order_summary                                              | postgres   |            | f          | f        | f           | f
 prefex__stg             | stg_customers                                                   | postgres   |            | f          | f        | f           | f
 prefex__int             | int_order_customer_joined                                       | postgres   |            | f          | f        | f           | f
 prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_customer_id              | postgres   |            | f          | f        | f           | f
 information_schema      | sql_parts                                                       | postgres   |            | f          | f        | f           | f
 information_schema      | sql_features                                                    | postgres   |            | f          | f        | f           | f
 information_schema      | sql_implementation_info                                         | postgres   |            | f          | f        | f           | f
 information_schema      | sql_sizing                                                      | postgres   |            | f          | f        | f           | f
 audit__reports          | audit_report                                                    | postgres   |            | f          | f        | f           | f

Let's examine the "audit__reports"."audit_report" table we have created.

batch_idmodel_idrelation_namecolumn_namekpi_namefailed_rowstotal_rowsfailure_pctsuccess_pctfailed_samplequery_textcreated_at

| 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | total_spent | audit__not_null | 4 | 4 | 100 | 0 | [{"customer_id":"101","customer_name":"ali","email":"invalid_email.com","total_orders":1,"total_spent":null,"first_order":"2024-06-01","last_order":"2024-06-01","created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"102","customer_name":"sara","email":"sara@example.com","total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"103","customer_name":"ali","email":"ali@example.com","total_orders":1,"total_spent":null,"first_order":"2024-06-03","last_order":"2024-06-03","created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"104","customer_name":null,"email":null,"total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}] | {
 SELECT 
 '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,
 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,
 '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,
 'total_spent'::TEXT as column_name,
 'audit__not_null'::TEXT as kpi_name,
 ft.failed_count::INT as failed_rows,
 tt.total_count::INT as total_rows,

 CASE 
 WHEN tt.total_count = 0 THEN 0 
 ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)
 END as failure_pct,

 CASE 
 WHEN tt.total_count = 0 THEN 100 
 ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)
 END as success_pct,

 fs.failed_sample,
 CURRENT_TIMESTAMP as dbt_created_at
 FROM 

 (
 SELECT COUNT(*) as failed_count
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) ft,


 (
 SELECT COUNT(*) as total_count 
 FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) tt,


 (
 SELECT JSON_AGG(sample_data) as failed_sample
 FROM (
 SELECT row_to_json(t) AS sample_data
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent" t
 WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'
 LIMIT 100
 ) sample
 ) AS fs
} | 2025-06-08 16:47:21 | | 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | first_order | audit__not_null | 2 | 4 | 50 | 50 | [{"customer_id":"102","customer_name":"sara","email":"sara@example.com","total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"104","customer_name":null,"email":null,"total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}] | {
 SELECT 
 '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,
 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,
 '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,
 'first_order'::TEXT as column_name,
 'audit__not_null'::TEXT as kpi_name,
 ft.failed_count::INT as failed_rows,
 tt.total_count::INT as total_rows,

 CASE 
 WHEN tt.total_count = 0 THEN 0 
 ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)
 END as failure_pct,

 CASE 
 WHEN tt.total_count = 0 THEN 100 
 ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)
 END as success_pct,

 fs.failed_sample,
 CURRENT_TIMESTAMP as dbt_created_at
 FROM 

 (
 SELECT COUNT(*) as failed_count
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) ft,


 (
 SELECT COUNT(*) as total_count 
 FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) tt,


 (
 SELECT JSON_AGG(sample_data) as failed_sample
 FROM (
 SELECT row_to_json(t) AS sample_data
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order" t
 WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'
 LIMIT 100
 ) sample
 ) AS fs
} | 2025-06-08 16:47:21 | | 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | email | audit__consistency | 1 | 4 | 25 | 75 | [{"customer_id":"101","customer_name":"ali","email":"invalid_email.com","total_orders":1,"total_spent":null,"first_order":"2024-06-01","last_order":"2024-06-01","created_at":"2025-06-08T16:47:13.171396+00:00"}] | {
 SELECT 
 '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,
 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,
 '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,
 'email'::TEXT as column_name,
 'audit__consistency'::TEXT as kpi_name,
 ft.failed_count::INT as failed_rows,
 tt.total_count::INT as total_rows,

 CASE 
 WHEN tt.total_count = 0 THEN 0 
 ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)
 END as failure_pct,

 CASE 
 WHEN tt.total_count = 0 THEN 100 
 ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)
 END as success_pct,

 fs.failed_sample,
 CURRENT_TIMESTAMP as dbt_created_at
 FROM 

 (
 SELECT COUNT(*) as failed_count
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) ft,


 (
 SELECT COUNT(*) as total_count 
 FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"
 WHERE created_at = '2025-06-08 16:47:13.171396+00:00'
 ) tt,


 (
 SELECT JSON_AGG(sample_data) as failed_sample
 FROM (
 SELECT row_to_json(t) AS sample_data
 FROM "data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd" t
 WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'
 LIMIT 100
 ) sample
 ) AS fs
} | 2025-06-08 16:47:21 |

0
Subscribe to my newsletter

Read articles from Ahmad Muhammad directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ahmad Muhammad
Ahmad Muhammad

I’m Ahmad Muhammad, a data engineer applying mathematics and software principles to solve real-world problems using data. I consider myself a tool-agnostic engineer. Theory comes first. Implementation is usually the easy part. I believe that theory and abstract science should always come first, and implementation is often the easiest part. This is especially true with many data solution companies creating tools with great user experiences to attract the community's attention.