Database Engineering

ETL Pipeline Monitoring: Freshness Checks, Row Reconciliation, and dbt Tests

Monitor ETL pipelines with data freshness checks, row count reconciliation, dbt data quality tests, and Slack alerting. Detect silent failures before users do.

JusDB Team
July 11, 2025
5 min read
153 views

ETL pipelines fail silently more often than they crash loudly. A broken pipeline that produces stale or wrong data is worse than one that fails fast. Here is how to build observability into your data pipelines.

What to Monitor

  • Row counts: expected vs actual rows per run
  • Freshness: max updated_at in destination — is data current?
  • Null rates: unexpected nulls in critical columns
  • Duplicate rates: duplicate keys in destination table
  • Pipeline lag: time from source event to destination availability

Data Freshness Check (PostgreSQL)

sql
-- Alert if no new rows in past 15 minutes
SELECT
  CASE
    WHEN max(created_at) < now() - INTERVAL '15 minutes'
    THEN 'STALE'
    ELSE 'FRESH'
  END AS freshness,
  max(created_at) AS latest_row,
  now() - max(created_at) AS lag
FROM orders;

Row Count Reconciliation

python
import psycopg2

def check_row_counts(source_conn, dest_conn, table, date):
    src = source_conn.execute(
        'SELECT count(*) FROM {} WHERE date = %s'.format(table), (date,)
    ).fetchone()[0]
    dst = dest_conn.execute(
        'SELECT count(*) FROM {} WHERE date = %s'.format(table), (date,)
    ).fetchone()[0]
    discrepancy = abs(src - dst)
    if discrepancy > 0:
        raise ValueError(f'{table}: source={src}, dest={dst}, diff={discrepancy}')
    return True

dbt Tests for Data Quality

yaml
# models/orders.yml
models:
  - name: orders
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: [pending, completed, cancelled]
      - name: amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Alerting on Pipeline Failure

python
import requests

def send_slack_alert(message: str, webhook_url: str):
    requests.post(webhook_url, json={'text': f':warning: ETL Alert: {message}'})

# In your pipeline runner:
try:
    run_pipeline()
except Exception as e:
    send_slack_alert(f'Pipeline failed: {e}', SLACK_WEBHOOK)
    raise

Key Takeaways

  • Check data freshness (max updated_at) — silent staleness is harder to detect than crashes
  • Reconcile row counts between source and destination after every ETL run
  • Use dbt tests for automated schema validation, null checks, and referential integrity
  • Alert on pipeline failure immediately — every minute of silent failure is data debt

JusDB Can Help

Data pipeline observability is often an afterthought. JusDB can add monitoring and data quality checks to your existing ETL infrastructure.

Share this article

JusDB Team

Official JusDB content team