dbt Data Quality Audit, But on Steroid


Data quality auditing is a process of ensuring that data is fit for a given purpose. Poor data quality results in wrong business decisions and might decline corporate performance.
some of these metrics are:
Metric | Description | Formula |
Completeness | Percentage of complete (non-null) values in a column. | number of non-null values / total rows |
Consistency | Percentage of values that follow a defined format (e.g., date, phone number). | number of consistent values / total rows |
Data Timeliness | Describes how up-to-date or fresh the data is relative to the current time. | Context-dependent; may use timestamp difference |
Uniqueness | Describes how many values are unique (non-duplicate) in a column. | number of distinct values / total rows |
The Problem With dbt tests
In dbt, you get some out-of-the-box data tests, in addition to user-defined tests on data, and you get results along with severity
by setting its behavior to error
or warn
and threshold.
However, this is not enough for a good auditing strategy, as we don't have the ability to store these failures historically, even when using --store-failures
.
A test's results will always replace previous failures for the same test. according to dbt docs
But, Why to store errors historically at the first place?
Trend Analysis: Historical failures help identify persistent issues and measure improvement over time.
Root Cause Tracking: Recurring errors often stem from upstream source systems or processes; historical data enables RCA.
Audit & Compliance: In regulated environments, it's essential to track and prove how data quality evolved.
Business Impact Assessment: Helps correlate poor data quality with operational or financial consequences.
Without historical data audit, you are flying blindly!
Proposed Solution
Additional work is needed to move
stored-failures
data to a secure location and mark each batch with a timestamp (hence the term "historical").Calculate some KPIs (Key Performance Indicators) based on this data, with detailed information at the column level.
Here, I will define a simple core table schema for the proposed table that will store our test data.
Field Name | Data Type | Description |
batch_id | TEXT | Batch unique identifier |
model_id | TEXT | The tested model |
column_name | TEXT | The tested column |
kpi_name | TEXT | Type of test was performed on this table |
failed_rows | INT | Number of rows which failed the test |
total_rows | INT | Number of total rows (including nulls) |
success_pct | DECIMAL(5, 2) | Passed_rows * 100 / total_rows |
failure_pct | DECIMAL(5, 2) | Failed_rows * 100 / total_rows |
created_at | TIMESTAMP | The timestamp at which stored failures moved to this table |
A good start, but it needs additional fields, such as a failed sample and the query that loaded this data! Let's add these extra fields to our table.
Field Name | Data Type | Description |
failed_sample | JSON | 100 rows Sample of the failed population. |
query_text | TEXT[] | The query used to insert data into this table. |
Despite failure tables being expected to follow the naming convention <test_name>_<model_name>_<column_name>
, you might encounter names like audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd
, which makes extracting this data challenging.
Fortunately, there is a way to extract these information from graph context directly inside dbt.
The
graph
context variable is a dictionary which maps node ids onto dictionary representations of those nodes. - dbt docs
Some built-in tests in dbt don't return failed rows, so new tests need to be implemented. Note that every test starts with audit__
, as this naming convention will be important later!
-- some test examples from my demo
-- ./macros/audit/consistency.sql
{% test audit__consistency(model, column_name, regex, case_sensitive=false) %}
SELECT * FROM {{ model }}
{% if case_sensitive %}
{% set operator = '!~' %}
{% else %}
{% set operator = '!~*' %}
{% endif %}
WHERE {{ column_name }} {{operator}} '{{ regex }}'
{% endtest %}
-- ./macros/audit/greater_than_zero.sql
{% test audit__greater_than_zero(model, column_name) %}
SELECT * FROM {{ model }}
WHERE {{ column_name }} < 0
{% endtest %}
-- ./macros/audit/not_null.sql
{% test audit__not_null(model, column_name) %}
SELECT * FROM {{ model }}
WHERE {{ column_name }} IS NULL
{% endtest %}
-- ./macros/audit/unique.sql
{% test audit__unique(model, column_name) %}
with duplicates as (
select {{ column_name }}
from {{ model }}
group by {{ column_name }}
having count(*) > 1
)
select *
from {{ model }}
where {{ column_name }} in (select {{ column_name }} from duplicates)
{% endtest %}
Exploring Graph Context
To access the graph context in dbt, all you need to do is call {{ graph }}
. That's it! But what does it contain?
Let's begin by exploring the graph's top-level keys.
dbt compile --inline "{{ graph.keys() }}"
17:00:27 Running with dbt=1.9.6
17:00:27 Registered adapter: postgres=1.9.0
17:00:28 Found 9 models, 3 seeds, 6 data tests, 1 sql operation, 440 macros
17:00:28
17:00:28 Concurrency: 1 threads (target='dev')
17:00:28
Compiled inline node is:
dict_keys(['exposures', 'groups', 'metrics', 'nodes', 'sources', 'semantic_models', 'saved_queries'])
Now, Letβs get a level deeper and check nodes
keys, and surprise! We got all of our models, seeds, and tests
dbt compile --inline "{{ graph.nodes.keys() }}"
17:04:28 Running with dbt=1.9.6
17:04:29 Registered adapter: postgres=1.9.0
17:04:29 Found 9 models, 3 seeds, 6 data tests, 1 sql operation, 440 macros
17:04:29
17:04:29 Concurrency: 1 threads (target='dev')
17:04:29
Compiled inline node is:
dict_keys(['model.audit_on_steroid.stg_orders', 'model.audit_on_steroid.stg_products', 'model.audit_on_steroid.stg_customers', 'model.audit_on_steroid.int_products', 'model.audit_on_steroid.int_order_customer_joined', 'model.audit_on_steroid.int_customers', 'model.audit_on_steroid.int_orders', 'model.audit_on_steroid.mart_order_summary', 'seed.audit_on_steroid.raw_orders', 'seed.audit_on_steroid.raw_products', 'seed.audit_on_steroid.raw_customers', 'model.audit_on_steroid.mart_order_summary_wap', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_customer_id.7adbd28cc6', 'test.audit_on_steroid.audit__unique_mart_order_summary_wap_customer_id.bb8cd4e984', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_total_spent.d0468db7f7', 'test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0', 'test.audit_on_steroid.audit__not_null_mart_order_summary_wap_first_order.22b1d319a6', 'test.audit_on_steroid.audit__consistency_mart_order_summary_wap_email___A_Za_z0_9___A_Za_z0_9_A_Za_z_.5e07113309'])
Letβs pickup a test and see what it includes:
dbt compile --inline "{{ graph.nodes.get('test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0') }}"
{
"database": "data_warehouse",
"schema": "prefex__dbt_test__audit",
"name": "audit__greater_than_zero_mart_order_summary_wap_total_spent",
"resource_type": "test",
"package_name": "audit_on_steroid",
"path": "audit__greater_than_zero_mart_order_summary_wap_total_spent.sql",
"original_file_path": "models/mart/_mart_order_summary.yml",
"unique_id": "test.audit_on_steroid.audit__greater_than_zero_mart_order_summary_wap_total_spent.15fc2665f0",
"fqn": [
"audit_on_steroid",
"mart",
"audit__greater_than_zero_mart_order_summary_wap_total_spent",
],
"alias": "audit__greater_than_zero_mart_order_summary_wap_total_spent",
"checksum": {"name": "none", "checksum": ""},
"config": {
"enabled": True,
"alias": None,
"schema": "dbt_test__audit",
"database": None,
"tags": [],
"meta": {},
"group": None,
"materialized": "test",
"severity": "warn",
"store_failures": True,
"store_failures_as": "table",
"where": None,
"limit": None,
"fail_calc": "count(*)",
"warn_if": "!= 0",
"error_if": "!= 0",
},
"tags": [],
"description": "",
"columns": {},
"meta": {},
"group": None,
"docs": {"show": True, "node_color": None},
"patch_path": None,
"build_path": None,
"unrendered_config": {"store_failures": True, "severity": "warn"},
"created_at": 1749301264.3145826,
"config_call_dict": {},
"unrendered_config_call_dict": {},
"relation_name": '"data_warehouse"."prefex__dbt_test__audit"."audit__greater_than_zero_mart_order_summary_wap_total_spent"',
"raw_code": "{{ test_audit__greater_than_zero(**_dbt_generic_test_kwargs) }}",
"language": "sql",
"refs": [{"name": "mart_order_summary_wap", "package": None, "version": None}],
"sources": [],
"metrics": [],
"depends_on": {
"macros": [
"macro.audit_on_steroid.test_audit__greater_than_zero",
"macro.dbt.get_where_subquery",
],
"nodes": ["model.audit_on_steroid.mart_order_summary_wap"],
},
"compiled_path": None,
"contract": {"enforced": False, "alias_types": True, "checksum": None},
"column_name": "total_spent",
"file_key_name": "models.mart_order_summary_wap",
"attached_node": "model.audit_on_steroid.mart_order_summary_wap",
"test_metadata": {
"name": "audit__greater_than_zero",
"kwargs": {
"column_name": "total_spent",
"model": "{{ get_where_subquery(ref('mart_order_summary_wap')) }}",
},
"namespace": None,
},
}
Thatβs awesome! you have the access to all related metadata to any dbt resource.
But in our case, the interest is drifted to this section of the previous dictionary
by extracting tests from graph
, itβs possible to get the model which it tests in addition to the column it tests.
"column_name": "total_spent",
"file_key_name": "models.mart_order_summary_wap",
"attached_node": "model.audit_on_steroid.mart_order_summary_wap",
Utilize Graph Context in Data Quality Auditing Automation.
Collect Test Nodes
Since every node has resource_type
property, itβs possible to filter nodes based on type, then extract needed metadata from it.
{% macro __get_test_metadata() %}
{% set test_metadata = [] %}
{% if execute %}
{% for test_node in graph.nodes.values()
| selectattr("resource_type", "equalto", "test") %}
{% set model_node = graph.nodes.get(test_node.attached_node) %}
{% if model_node and
test_node.test_metadata.name and
test_node.test_metadata.name.startswith('audit_') %}
{% do test_metadata.append({
'test_name': test_node.test_metadata.name,
'column_name': test_node.column_name | default(''),
'test_relation': test_node.relation_name,
'model_relation': model_node.relation_name,
'model_id': model_node.unique_id
}) %}
{% endif %}
{% endfor %}
{% endif %}
{{ return(test_metadata) }}
{% endmacro %}
Letβs compile __get_test_metadata()
macro to see itβs result
dbt compile --inline "{{ __get_test_metadata() }}"
[
{
"test_name": "audit__not_null",
"column_name": "customer_id",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_customer_id"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
{
"test_name": "audit__unique",
"column_name": "customer_id",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__unique_mart_order_summary_wap_customer_id"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
{
"test_name": "audit__not_null",
"column_name": "total_spent",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
{
"test_name": "audit__greater_than_zero",
"column_name": "total_spent",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__greater_than_zero_mart_order_summary_wap_total_spent"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
{
"test_name": "audit__not_null",
"column_name": "first_order",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
{
"test_name": "audit__consistency",
"column_name": "email",
"test_relation": '"data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd"',
"model_relation": '"data_warehouse"."prefex__mart"."mart_order_summary_wap"',
"model_id": "model.audit_on_steroid.mart_order_summary_wap",
},
]
Constructing The Audit Table
In the previous array of dictionaries, each element can represent a select
statement that will be inserted into the audit table.
SELECT
'{{ batch_id }}'::TEXT as batch_id,
'{{ test.model_id }}'::TEXT as model_id,
'{{ test.model_relation }}'::TEXT as relation_name,
'{{ test.column_name }}'::TEXT as column_name,
'{{ test.test_name }}'::TEXT as kpi_name,
ft.failed_count::INT as failed_rows,
tt.total_count::INT as total_rows,
CASE
WHEN tt.total_count = 0 THEN 0
ELSE ROUND((ft.failed_count * 100.0) / tt.total_count, 2)
END as failure_pct,
CASE
WHEN tt.total_count = 0 THEN 100
ELSE ROUND(100 - (ft.failed_count * 100.0) / tt.total_count, 2)
END as success_pct,
fs.failed_sample,
CURRENT_TIMESTAMP as created_at
FROM
-- Failed rows count for this specific batch
(
SELECT COUNT(*) as failed_count
FROM {{ test.test_relation }}
WHERE {{ batch_id_column }} = '{{ batch_id }}'
) ft,
-- Total rows count for this batch from the model
(
SELECT COUNT(*) as total_count
FROM {{ test.model_relation }}
WHERE {{ batch_id_column }} = '{{ batch_id }}'
) tt,
-- Sample of failed records for this batch
(
SELECT JSON_AGG(sample_data) as failed_sample
FROM (
SELECT row_to_json(t) AS sample_data
FROM {{ test.test_relation }} t
WHERE t.{{ batch_id_column }} = '{{ batch_id }}'
LIMIT 100
) sample
) AS fs
The next step is to extract batches based on a given column that could be a batch identifier.
Test results will always be replaced after running the test, but:
If batch could pass the test, the stored failures table will not be emptied or replaced.
So, to avoid duplicate records in our historical audit table, it's crucial to verify if a given
batch_id
for a specific test and model has already been recorded.
SELECT DISTINCT
tr.{{ batch_id_column }}
FROM
{{ test.test_relation }} tr -- where stored failures occure
WHERE NOT EXISTS (
SELECT 1
FROM {{ audit_schema }}.{{ audit_table_name }} ar
WHERE ar.batch_id = tr.{{ batch_id_column }}::TEXT
AND ar.model_id = '{{ test.model_id }}'
AND ar.kpi_name = '{{ test.test_name }}'
AND ar.column_name = '{{ test.column_name }}'
)
ORDER BY tr.{{ batch_id_column }}
The workflow will look similar to this flowchart.
flowchart TD
A[Start macro: audit__post_hook] --> B[Create audit schema and table if not exists]
B --> C[Call macro: __get_test_metadata]
C --> D[Loop through each test in metadata]
D --> E{test_relation & model_relation exist?}
E -- No --> D
E -- Yes --> F[Run query to find unprocessed batch_ids]
F --> G[Log found batch_ids]
G --> H[Get first and only batch_id]
H --> I[Build SQL to calculate KPIs and failed sample]
I --> J[Wrap SQL with query_text and add to audit_select_queries]
J --> M[Construct INSERT INTO audit_report]
M --> L{More tests}
L -- yes --> D
L -- no --> N[Run audit_insert_query]
N --> O[Log insert query and result status]
O --> P[End]
Demo
Setup The Project
docker compose up -d # run the postgres container
cd audit_on_steroid
dbt seed --profiles-dir . # this loads sample data into the db
Exploring The Demo
In our models directory, you'll find the three familiar layers: stg, int, and mart. Our goal is to use the Write Audit Publish (WAP) pattern. Check out the model mart_order_summary_wap.sql
, which will store our audited data before it's published to production.
β― tree ./models
./models
βββ int
β βββ int_customers.sql
β βββ int_order_customer_joined.sql
β βββ int_orders.sql
β βββ int_products.sql
βββ mart
β βββ mart_order_summary.sql
β βββ mart_order_summary_wap.sql
β βββ _mart_order_summary.yml
βββ stg
βββ stg_customers.sql
βββ stg_orders.sql
βββ stg_products.sql
3 directories, 10 files
Letβs check how _mart_order_summary.yml
was written. Remember the goal of using custom generic tests is to return the entire failed row instead of failures summary.
version: 2
models:
- name: mart_order_summary_wap
config:
tags: ['wap'] # Notice the tag here!
description: "Customer-level order aggregation WAP table"
columns:
- name: customer_id
data_tests: [audit__not_null, audit__unique]
- name: total_spent
data_tests: [audit__not_null, audit__greater_than_zero]
- name: first_order
data_tests:
- audit__not_null
- name: email
data_tests:
- audit__consistency:
regex: "^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$"
Now, itβs time to run the workflow!
dbt run -s +tag:wap && \ # run wap models upstream -including wap model-
dbt test -s tag:wap && \ # run tests step
dbt run-operation audit__post_hook \ # move test results to our audit table!
--args '{audit_schema: "audit__reports", audit_table_name: "audit_report", batch_id_column: "created_at"}' && \
dbt run -s tag:wap+ --exclude tag:wap # publish data to prod table
This table is what should be expected in the mart_order_summary_wap
table.
customer_id | customer_name | total_orders | total_spent | first_order | last_order | created_at | |
101 | ali | invalid_email.com | 1 | 2024-06-01 | 2024-06-01 | 2025-06-08 16:47:13.171396+00 | |
102 | sara | sara@example.com | 1 | 2025-06-08 16:47:13.171396+00 | |||
103 | ali | ali@example.com | 1 | 2024-06-03 | 2024-06-03 | 2025-06-08 16:47:13.171396+00 | |
104 | 1 | 2025-06-08 16:47:13.171396+00 |
In this demo, the severity
was set to 'warn' for demonstration purposes, ensuring that it does not break the pipeline. You can experiment with it to establish data quality rules and observe the results. Letβs check tables created by the run:
β― docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d49fd5fbe329 postgres "docker-entrypoint.sβ¦" 27 hours ago Up 5 hours 0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp audit_on_steroid-db-1
β― docker exec -it audit_on_steroid-db-1 psql -U postgres -d data_warehouse
psql (17.5 (Debian 17.5-1.pgdg120+1))
Type "help" for help.
data_warehouse=# SELECT * FROM pg_catalog.pg_tables where tablename not like 'pg_%';
schemaname | tablename | tableowner | tablespace | hasindexes | hasrules | hastriggers | rowsecurity
-------------------------+-----------------------------------------------------------------+------------+------------+------------+----------+-------------+-------------
prefex__stg | stg_orders | postgres | | f | f | f | f
prefex_ | raw_customers | postgres | | f | f | f | f
prefex__mart | mart_order_summary_wap | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_first_order | postgres | | f | f | f | f
prefex_ | raw_orders | postgres | | f | f | f | f
prefex__int | int_customers | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_total_spent | postgres | | f | f | f | f
prefex_ | raw_products | postgres | | f | f | f | f
prefex__int | int_orders | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__greater_than_zero_mart_order_summary_wap_total_spent | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__unique_mart_order_summary_wap_customer_id | postgres | | f | f | f | f
prefex__mart | mart_order_summary | postgres | | f | f | f | f
prefex__stg | stg_customers | postgres | | f | f | f | f
prefex__int | int_order_customer_joined | postgres | | f | f | f | f
prefex__dbt_test__audit | audit__not_null_mart_order_summary_wap_customer_id | postgres | | f | f | f | f
information_schema | sql_parts | postgres | | f | f | f | f
information_schema | sql_features | postgres | | f | f | f | f
information_schema | sql_implementation_info | postgres | | f | f | f | f
information_schema | sql_sizing | postgres | | f | f | f | f
audit__reports | audit_report | postgres | | f | f | f | f
Let's examine the "audit__reports"."audit_report"
table we have created.
batch_id | model_id | relation_name | column_name | kpi_name | failed_rows | total_rows | failure_pct | success_pct | failed_sample | query_text | created_at |
| 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | total_spent | audit__not_null | 4 | 4 | 100 | 0 | [{"customer_id":"101","customer_name":"ali","email":"invalid_email.com","total_orders":1,"total_spent":null,"first_order":"2024-06-01","last_order":"2024-06-01","created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"102","customer_name":"sara","email":"sara@example.com","total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"103","customer_name":"ali","email":"ali@example.com","total_orders":1,"total_spent":null,"first_order":"2024-06-03","last_order":"2024-06-03","created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"104","customer_name":null,"email":null,"total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}] | {β¨ SELECT β¨ '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,β¨ 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,β¨ '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,β¨ 'total_spent'::TEXT as column_name,β¨ 'audit__not_null'::TEXT as kpi_name,β¨ ft.failed_count::INT as failed_rows,β¨ tt.total_count::INT as total_rows,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 0 β¨ ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)β¨ END as failure_pct,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 100 β¨ ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)β¨ END as success_pct,β¨β¨ fs.failed_sample,β¨ CURRENT_TIMESTAMP as dbt_created_atβ¨ FROM β¨β¨ (β¨ SELECT COUNT(*) as failed_countβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) ft,β¨β¨β¨ (β¨ SELECT COUNT(*) as total_count β¨ FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) tt,β¨β¨β¨ (β¨ SELECT JSON_AGG(sample_data) as failed_sampleβ¨ FROM (β¨ SELECT row_to_json(t) AS sample_dataβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_total_spent" tβ¨ WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'β¨ LIMIT 100β¨ ) sampleβ¨ ) AS fsβ¨} | 2025-06-08 16:47:21 | | 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | first_order | audit__not_null | 2 | 4 | 50 | 50 | [{"customer_id":"102","customer_name":"sara","email":"sara@example.com","total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}, {"customer_id":"104","customer_name":null,"email":null,"total_orders":1,"total_spent":null,"first_order":null,"last_order":null,"created_at":"2025-06-08T16:47:13.171396+00:00"}] | {β¨ SELECT β¨ '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,β¨ 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,β¨ '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,β¨ 'first_order'::TEXT as column_name,β¨ 'audit__not_null'::TEXT as kpi_name,β¨ ft.failed_count::INT as failed_rows,β¨ tt.total_count::INT as total_rows,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 0 β¨ ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)β¨ END as failure_pct,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 100 β¨ ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)β¨ END as success_pct,β¨β¨ fs.failed_sample,β¨ CURRENT_TIMESTAMP as dbt_created_atβ¨ FROM β¨β¨ (β¨ SELECT COUNT(*) as failed_countβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) ft,β¨β¨β¨ (β¨ SELECT COUNT(*) as total_count β¨ FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) tt,β¨β¨β¨ (β¨ SELECT JSON_AGG(sample_data) as failed_sampleβ¨ FROM (β¨ SELECT row_to_json(t) AS sample_dataβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__not_null_mart_order_summary_wap_first_order" tβ¨ WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'β¨ LIMIT 100β¨ ) sampleβ¨ ) AS fsβ¨} | 2025-06-08 16:47:21 | | 2025-06-08 16:47:13.171396+00:00 | model.audit_on_steroid.mart_order_summary_wap | data_warehouse."prefex__mart"."mart_order_summary_wap" | email | audit__consistency | 1 | 4 | 25 | 75 | [{"customer_id":"101","customer_name":"ali","email":"invalid_email.com","total_orders":1,"total_spent":null,"first_order":"2024-06-01","last_order":"2024-06-01","created_at":"2025-06-08T16:47:13.171396+00:00"}] | {β¨ SELECT β¨ '2025-06-08 16:47:13.171396+00:00'::TEXT as batch_id,β¨ 'model.audit_on_steroid.mart_order_summary_wap'::TEXT as model_id,β¨ '"data_warehouse"."prefex__mart"."mart_order_summary_wap"'::TEXT as relation_name,β¨ 'email'::TEXT as column_name,β¨ 'audit__consistency'::TEXT as kpi_name,β¨ ft.failed_count::INT as failed_rows,β¨ tt.total_count::INT as total_rows,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 0 β¨ ELSE ROUND((ft.failed_count 100.0) / tt.total_count, 2)β¨ END as failure_pct,β¨β¨ CASE β¨ WHEN tt.total_count = 0 THEN 100 β¨ ELSE ROUND(100 - (ft.failed_count 100.0) / tt.total_count, 2)β¨ END as success_pct,β¨β¨ fs.failed_sample,β¨ CURRENT_TIMESTAMP as dbt_created_atβ¨ FROM β¨β¨ (β¨ SELECT COUNT(*) as failed_countβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) ft,β¨β¨β¨ (β¨ SELECT COUNT(*) as total_count β¨ FROM "data_warehouse"."prefex__mart"."mart_order_summary_wap"β¨ WHERE created_at = '2025-06-08 16:47:13.171396+00:00'β¨ ) tt,β¨β¨β¨ (β¨ SELECT JSON_AGG(sample_data) as failed_sampleβ¨ FROM (β¨ SELECT row_to_json(t) AS sample_dataβ¨ FROM "data_warehouse"."prefex__dbt_test__audit"."audit__consistency_mart_order__dd46b64be0a4d8ce4b4eb322540fd7fd" tβ¨ WHERE t.created_at = '2025-06-08 16:47:13.171396+00:00'β¨ LIMIT 100β¨ ) sampleβ¨ ) AS fsβ¨} | 2025-06-08 16:47:21 |
Subscribe to my newsletter
Read articles from Ahmad Muhammad directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Ahmad Muhammad
Ahmad Muhammad
Iβm Ahmad Muhammad, a data engineer applying mathematics and software principles to solve real-world problems using data. I consider myself a tool-agnostic engineer. Theory comes first. Implementation is usually the easy part. I believe that theory and abstract science should always come first, and implementation is often the easiest part. This is especially true with many data solution companies creating tools with great user experiences to attract the community's attention.