Database Engineering

Data Quality for Database Teams: Validation, Profiling, and Alerting

Add data quality checks to your pipeline — row count validation, schema drift detection, and anomaly alerting with Great Expectations

JusDB Team
June 6, 2023
12 min read
145 views

A silent pipeline failure at 2 AM doesn't announce itself — your tables quietly fill with NULLs, duplicate keys, and orphaned foreign keys while dashboards show green. By the time an analyst notices the revenue numbers look off, you're already three ETL runs deep into corrupted data. Data quality isn't a nice-to-have for mature teams; it's the difference between a database you trust and one you're constantly apologizing for. This post walks you through the validation, profiling, and alerting patterns that keep production databases honest.

TL;DR
  • Data quality covers four dimensions: completeness, accuracy, consistency, and timeliness — instrument all four.
  • Row count thresholds, NULL rate queries, and FK violation checks catch 80% of issues before users do.
  • Schema drift detection using INFORMATION_SCHEMA prevents silent breaking changes from upstream sources.
  • Great Expectations and dbt tests give you a declarative quality layer that lives in version control.
  • Route alerts to Slack or PagerDuty based on severity — not all quality failures are 3 AM pages.
  • Define data SLAs with explicit freshness and error-rate thresholds so "acceptable" is never ambiguous.

Why Data Quality Matters for Database Teams

Data quality problems compound. A single bad upstream load cascades into wrong aggregates, incorrect ML training sets, and misleading executive reports. The cost isn't just engineering time — it's trust, and trust is hard to rebuild once an analyst learns to double-check every number that comes out of your pipeline.

The industry uses four core quality dimensions as a framework:

  • Completeness — Are expected rows and columns present? Are NULLs within acceptable bounds?
  • Accuracy — Do values reflect reality? Are dates in range, amounts non-negative, codes from valid enumerations?
  • Consistency — Is the same fact represented the same way across tables and systems? Do foreign keys resolve?
  • Timeliness — Did the data arrive when expected? Is the most recent record fresher than your SLA allows?

Each dimension maps to specific checks you can automate. The goal is to shift quality failures from "discovered by a user" to "caught by a monitor."

Row Count Validation with Threshold Alerts

The simplest check is often the most valuable: how many rows landed in a table after a load, and is that number reasonable? Row count validation catches truncated loads, duplicate inserts, and missed partitions without requiring deep schema knowledge.

Store baseline counts in a metadata table and compare after each load:

sql
-- Create a quality metrics tracking table
CREATE TABLE pipeline_quality_metrics (
  table_name        VARCHAR(255) NOT NULL,
  partition_date    DATE NOT NULL,
  row_count         BIGINT NOT NULL,
  load_timestamp    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (table_name, partition_date)
);

-- After each load, insert the observed count
INSERT INTO pipeline_quality_metrics (table_name, partition_date, row_count)
SELECT
  'orders' AS table_name,
  CURRENT_DATE AS partition_date,
  COUNT(*) AS row_count
FROM orders
WHERE created_at::DATE = CURRENT_DATE;

-- Alert query: flag if today's count deviates more than 20% from 7-day average
SELECT
  today.table_name,
  today.row_count AS today_count,
  AVG(hist.row_count) AS avg_7d,
  ROUND(
    100.0 * (today.row_count - AVG(hist.row_count)) / NULLIF(AVG(hist.row_count), 0),
    2
  ) AS pct_change
FROM pipeline_quality_metrics today
JOIN pipeline_quality_metrics hist
  ON hist.table_name = today.table_name
  AND hist.partition_date BETWEEN CURRENT_DATE - 7 AND CURRENT_DATE - 1
WHERE today.partition_date = CURRENT_DATE
GROUP BY today.table_name, today.row_count
HAVING ABS(
  100.0 * (today.row_count - AVG(hist.row_count)) / NULLIF(AVG(hist.row_count), 0)
) > 20;
Warning

A 20% threshold works for stable tables but will fire constantly on tables with natural weekly seasonality (e.g., order tables that dip on weekends). Segment your baseline by day-of-week for those tables, or you'll train your team to ignore the alerts.

Schema Drift Detection

Upstream systems change schemas without warning. A column gets renamed, a type changes from INT to BIGINT, or a NOT NULL constraint disappears. These changes break downstream queries silently — the pipeline keeps running, but results are wrong.

Snapshot your schema at each load using INFORMATION_SCHEMA and diff against the previous snapshot:

sql
-- Snapshot current schema to a history table
INSERT INTO schema_snapshots (table_name, column_name, data_type, is_nullable, snapshot_date)
SELECT
  table_name,
  column_name,
  data_type,
  is_nullable,
  CURRENT_DATE
FROM information_schema.columns
WHERE table_schema = 'public'
  AND table_name IN ('orders', 'customers', 'products');

-- Detect columns added since the last snapshot
SELECT
  curr.table_name,
  curr.column_name,
  'COLUMN_ADDED' AS drift_type,
  curr.data_type
FROM schema_snapshots curr
LEFT JOIN schema_snapshots prev
  ON prev.table_name = curr.table_name
  AND prev.column_name = curr.column_name
  AND prev.snapshot_date = CURRENT_DATE - 1
WHERE curr.snapshot_date = CURRENT_DATE
  AND prev.column_name IS NULL

UNION ALL

-- Detect type changes
SELECT
  curr.table_name,
  curr.column_name,
  'TYPE_CHANGED' AS drift_type,
  curr.data_type || ' (was: ' || prev.data_type || ')' AS data_type
FROM schema_snapshots curr
JOIN schema_snapshots prev
  ON prev.table_name = curr.table_name
  AND prev.column_name = curr.column_name
  AND prev.snapshot_date = CURRENT_DATE - 1
WHERE curr.snapshot_date = CURRENT_DATE
  AND curr.data_type != prev.data_type;

Value Validation: NULLs, Duplicates, and Referential Integrity

Row counts and schema shape tell you structure is intact. Value validation tells you the data itself is trustworthy.

NULL Rate Monitoring

Acceptable NULL rates depend on business rules, but tracking the rate over time catches regressions:

sql
-- NULL rate for key columns across the orders table
SELECT
  'customer_id'      AS column_name,
  COUNT(*) FILTER (WHERE customer_id IS NULL) AS null_count,
  COUNT(*)           AS total_count,
  ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / COUNT(*), 2) AS null_pct
FROM orders WHERE created_at::DATE = CURRENT_DATE

UNION ALL

SELECT
  'shipping_address',
  COUNT(*) FILTER (WHERE shipping_address IS NULL),
  COUNT(*),
  ROUND(100.0 * COUNT(*) FILTER (WHERE shipping_address IS NULL) / COUNT(*), 2)
FROM orders WHERE created_at::DATE = CURRENT_DATE;

Duplicate Detection

sql
-- Find duplicate order IDs that should be unique
SELECT
  order_id,
  COUNT(*) AS occurrences
FROM orders
WHERE created_at::DATE = CURRENT_DATE
GROUP BY order_id
HAVING COUNT(*) > 1
ORDER BY occurrences DESC
LIMIT 50;

Foreign Key Violation Detection

sql
-- Orders referencing non-existent customers (FK violation)
SELECT
  o.order_id,
  o.customer_id,
  o.created_at
FROM orders o
LEFT JOIN customers c ON c.customer_id = o.customer_id
WHERE c.customer_id IS NULL
  AND o.created_at > NOW() - INTERVAL '24 hours';
Tip

Run FK violation checks on incremental windows (last 24 hours, last partition) rather than full-table scans. On tables with hundreds of millions of rows, a full scan for referential integrity can take minutes and lock out other queries.

Great Expectations: A Declarative Quality Layer

Ad-hoc SQL checks work but don't scale. Great Expectations gives you a Python-native framework to define an expectations suite that runs as part of your pipeline and produces documented, versioned quality reports.

python
import great_expectations as gx

context = gx.get_context()

# Connect to your datasource (PostgreSQL in this example)
datasource = context.sources.add_postgres(
    name="production_db",
    connection_string="postgresql+psycopg2://user:pass@host:5432/dbname",
)

data_asset = datasource.add_table_asset(name="orders", table_name="orders")
batch_request = data_asset.build_batch_request()

# Create an expectation suite
suite = context.add_expectation_suite("orders.daily_quality")

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders.daily_quality",
)

# Completeness expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")

# Accuracy expectations
validator.expect_column_values_to_be_between("order_total", min_value=0, max_value=100000)
validator.expect_column_values_to_be_in_set(
    "status", ["pending", "processing", "shipped", "delivered", "cancelled"]
)

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Freshness: expect rows from today
validator.expect_column_max_to_be_between(
    "created_at",
    min_value="2020-01-01",
    max_value="2099-12-31",
)

# Row count within expected range (adjust for your table)
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=500000)

validator.save_expectation_suite(discard_failed_expectations=False)

# Run validation and get results
results = validator.validate()
print(f"Validation passed: {results.success}")
print(f"Failed expectations: {results.statistics['unsuccessful_expectations']}")
Tip

Use Great Expectations' Data Docs feature to auto-generate an HTML quality report after each run. Hosting these docs on an internal S3 bucket or your BI tool gives stakeholders a self-service view of pipeline quality without pinging the data team.

dbt Tests as a Quality Layer

If you're already using dbt, its built-in and custom tests cover most quality dimensions with minimal boilerplate:

yaml
# models/marts/orders.yml
version: 2

models:
  - name: orders
    description: "Cleaned and validated orders fact table"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id
      - name: order_total
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']

Commercial Alternatives

For teams that need observability without the operational overhead of self-hosted tooling, Monte Carlo and Soda are the two most widely adopted options. Monte Carlo uses ML to establish automatic anomaly baselines — useful when your tables have complex seasonality patterns. Soda's YAML-first configuration integrates tightly into CI/CD pipelines and supports threshold-based SLA definitions out of the box. Both integrate with Slack and PagerDuty for alerting without custom webhook code.

Alerting Patterns: Slack and PagerDuty

A quality check that fails silently is no better than no check at all. Route alerts based on severity: informational issues go to a Slack channel; SLA violations that affect downstream users trigger PagerDuty.

python
import requests
import json

def send_slack_alert(webhook_url: str, table: str, check: str, details: str):
    """Send a data quality alert to a Slack channel via webhook."""
    payload = {
        "blocks": [
            {
                "type": "header",
                "text": {"type": "plain_text", "text": "Data Quality Alert"}
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Table:*\n{table}"},
                    {"type": "mrkdwn", "text": f"*Check:*\n{check}"},
                ]
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*Details:*\n{details}"}
            }
        ]
    }
    response = requests.post(webhook_url, json=payload)
    response.raise_for_status()


def trigger_pagerduty(routing_key: str, summary: str, severity: str = "critical"):
    """Trigger a PagerDuty incident for SLA-breaking quality failures."""
    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "payload": {
            "summary": summary,
            "severity": severity,      # critical | error | warning | info
            "source": "data-quality-monitor",
            "component": "pipeline",
        }
    }
    response = requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload,
        headers={"Content-Type": "application/json"},
    )
    response.raise_for_status()


# Example: alert routing based on failure severity
def route_quality_alert(check_result: dict):
    SLACK_WEBHOOK = "https://hooks.slack.com/services/..."
    PD_ROUTING_KEY = "your_pagerduty_routing_key"

    if check_result["severity"] == "critical":
        # SLA-breaking: page the on-call engineer
        trigger_pagerduty(
            routing_key=PD_ROUTING_KEY,
            summary=f"[DATA SLA BREACH] {check_result['table']}: {check_result['check']}",
        )
    else:
        # Informational: post to the #data-quality Slack channel
        send_slack_alert(
            webhook_url=SLACK_WEBHOOK,
            table=check_result["table"],
            check=check_result["check"],
            details=check_result["details"],
        )
Warning

Alert fatigue is a real risk. Start with high-severity thresholds and widen coverage incrementally. If your team is ignoring the Slack channel after two weeks, the thresholds are too sensitive. Review and tune weekly until the signal-to-noise ratio is high.

Defining Data SLAs

An SLA for a data pipeline should be as explicit as one for an API. Define three dimensions for each critical table:

  • Freshness SLA — "The orders table must contain records from within the last 4 hours during business hours."
  • Error rate SLA — "NULL rate on customer_id must not exceed 0.1%."
  • Completeness SLA — "Daily row count must not drop more than 15% below the 7-day rolling average."

Document these in your data catalog or a quality manifest file committed to your repository. When an SLA is breached, you have a clear definition of "acceptable" to fall back on rather than negotiating severity in the middle of an incident.

Key Takeaways
  • The four quality dimensions — completeness, accuracy, consistency, timeliness — map directly to concrete SQL checks you can automate today.
  • Row count validation with 7-day rolling baselines is the highest-ROI single check you can add to any pipeline.
  • Snapshot INFORMATION_SCHEMA on every load and diff against the previous snapshot to catch schema drift before it breaks downstream queries.
  • Great Expectations and dbt tests bring quality checks into version control, making them reviewable, testable, and documentable.
  • Route alerts by severity: Slack for informational warnings, PagerDuty for SLA breaches — and tune thresholds weekly to prevent alert fatigue.
  • Write explicit data SLAs for freshness, error rate, and completeness so "is the data good?" has a binary answer during incidents.

Take Your Data Quality Further with JusDB

Building and maintaining quality checks across dozens of tables and pipelines is an ongoing investment. JusDB's query optimization and database performance guides give you the deeper context you need to understand why quality failures happen — not just that they happened. Whether you're profiling slow validation queries, tuning indexes on your metrics tables, or choosing the right database for a new quality monitoring stack, JusDB covers the database engineering topics that matter to teams shipping production systems.

Explore more database engineering guides on JusDB and build pipelines you can actually trust.

Share this article