Database Engineering

Apache Airflow for Database Workflows: Scheduling and Orchestration

Use Apache Airflow to orchestrate database ETL, backup jobs, and maintenance tasks — DAGs, sensors, and best practices

JusDB Team
January 30, 2026
12 min read
193 views

Production database pipelines are rarely simple one-shot scripts — they involve retries, dependency chains, schedule windows, failure alerting, and audit trails that span dozens of interconnected tasks. Teams that manage these with cron and bash eventually hit a wall: silent failures, no visibility, and cascading breakage when a single step times out at 2 AM. Apache Airflow gives you a proper directed acyclic graph (DAG) model where every database task — ETL loads, backups, integrity checks, index rebuilds — is a first-class citizen with its own retry logic, SLA enforcement, and dependency awareness. In this post you will learn how to wire Airflow into your database operations from first principles, covering operators, sensors, the TaskFlow API, and deployment options so you can choose the right path for your team.

TL;DR
  • Airflow models database workflows as DAGs — each step is a task with retries, dependencies, and scheduling.
  • Use PostgresOperator, MySQLOperator, or the unified SQLExecuteQueryOperator (Airflow 2.5+) to run SQL inside tasks.
  • SqlSensor lets downstream tasks wait for data to arrive before proceeding — essential for ELT patterns.
  • The TaskFlow API (@task decorator) eliminates boilerplate for Python-heavy steps and handles XCom automatically.
  • Use retries, retry_delay, and sla on every production database task.
  • MWAA, Astronomer, and self-hosted each have a distinct cost/control tradeoff — pick based on team size and compliance requirements.

What Is Apache Airflow?

Apache Airflow is an open-source workflow orchestration platform originally built by Airbnb in 2014 and donated to the Apache Software Foundation in 2016. Its core idea is simple: you describe a workflow as a Python file, Airflow parses it into a directed acyclic graph, and the scheduler runs each node (task) in the correct order while respecting dependencies and schedule windows.

For database teams the key advantage over cron is observability by default. Every task execution is logged, every failure triggers configurable alerts, and the web UI gives you a full audit trail of every DAG run across every day. When a nightly ETL load silently produces zero rows, Airflow can detect that condition and page your on-call engineer before the morning standup rather than after the business notices stale dashboards.

Airflow also has a rich provider ecosystem — database connections to PostgreSQL, MySQL, Snowflake, BigQuery, Redshift, and others ship as installable packages so you get tested, maintained operators instead of rolling your own subprocess wrappers.

DAG Concepts: schedule_interval, start_date, and catchup

A DAG file is a Python module that Airflow's scheduler imports on a regular scan cycle. The three parameters that control when your DAG runs are schedule_interval, start_date, and catchup.

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

default_args = {
    "owner": "data-engineering",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["dba-alerts@example.com"],
}

with DAG(
    dag_id="nightly_etl",
    default_args=default_args,
    description="Load raw events into the warehouse",
    schedule_interval="0 2 * * *",   # 02:00 UTC every night
    start_date=datetime(2025, 1, 1),
    catchup=False,                   # do NOT run missed historical intervals
    tags=["etl", "postgres"],
) as dag:
    pass  # tasks defined below

schedule_interval accepts a cron expression, a timedelta, or a named preset like @daily. start_date is the earliest logical date Airflow will schedule a run for. catchup=False is almost always what you want for database operations — setting it to True means Airflow will enqueue one run per missed interval going back to start_date, which can saturate your database connection pool instantly if you deploy a DAG weeks after its start date.

Warning

Never set catchup=True on a DAG that writes to a production table without first understanding the backfill implications. A DAG with a 5-minute interval and a start_date one week in the past will immediately enqueue over 2,000 runs. Use airflow dags backfill with explicit date ranges when you genuinely need historical reprocessing.

Database Operators: Running SQL as Tasks

Airflow's provider packages expose operator classes that wrap a SQL statement in a task. The connection details (host, port, credentials) live in Airflow's encrypted Connections store — the operator just references a conn_id string.

Setting Up Connections in the UI

Navigate to Admin → Connections → + Add. Set the Connection Type to Postgres (or MySQL, etc.), fill in host, schema, login, password, and port, then save. The Connection ID you enter (e.g. postgres_warehouse) is what you pass to your operators. You can also inject connections via environment variables in the form AIRFLOW_CONN_POSTGRES_WAREHOUSE=postgresql://user:pass@host:5432/db for infrastructure-as-code deployments.

PostgresOperator and MySQLOperator

python
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator

load_events = PostgresOperator(
    task_id="load_raw_events",
    postgres_conn_id="postgres_warehouse",
    sql="""
        INSERT INTO warehouse.events (event_id, user_id, event_type, occurred_at)
        SELECT
            e.id,
            e.user_id,
            e.event_type,
            e.created_at
        FROM staging.raw_events e
        WHERE e.created_at >= '{{ ds }}'::date
          AND e.created_at <  '{{ next_ds }}'::date
        ON CONFLICT (event_id) DO NOTHING;
    """,
    dag=dag,
)

archive_mysql = MySqlOperator(
    task_id="archive_old_orders",
    mysql_conn_id="mysql_transactional",
    sql="""
        INSERT INTO orders_archive
        SELECT * FROM orders WHERE order_date < DATE_SUB(CURDATE(), INTERVAL 90 DAY);

        DELETE FROM orders WHERE order_date < DATE_SUB(CURDATE(), INTERVAL 90 DAY);
    """,
    dag=dag,
)

The {{ ds }} and {{ next_ds }} placeholders are Jinja-templated macros that Airflow resolves to the logical date of the current run — this gives you idempotent, date-partitioned loads without any Python string formatting.

SQLExecuteQueryOperator (Airflow 2.5+)

Airflow 2.5 introduced SQLExecuteQueryOperator as a database-agnostic replacement. The operator infers the correct hook from the connection type, so you can swap backends without changing operator class names.

python
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

rebuild_summary = SQLExecuteQueryOperator(
    task_id="rebuild_daily_summary",
    conn_id="postgres_warehouse",   # connection type determines the hook
    sql="""
        DELETE FROM reporting.daily_summary
        WHERE summary_date = '{{ ds }}'::date;

        INSERT INTO reporting.daily_summary (summary_date, revenue, order_count)
        SELECT
            '{{ ds }}'::date,
            SUM(total_amount),
            COUNT(*)
        FROM warehouse.orders
        WHERE order_date = '{{ ds }}'::date;
    """,
    dag=dag,
)
Tip

Pass split_statements=True to SQLExecuteQueryOperator when your SQL string contains multiple semicolon-delimited statements. Without it, some database hooks send the whole string as one query and the second statement may be silently ignored.

SqlSensor: Waiting for Data Before Proceeding

In ELT pipelines you often cannot start your transformation until an upstream system has finished loading data. SqlSensor polls a SQL query on a configurable interval and allows downstream tasks to proceed only when the query returns a non-empty, truthy result.

python
from airflow.providers.common.sql.sensors.sql import SqlSensor

wait_for_source_data = SqlSensor(
    task_id="wait_for_source_data",
    conn_id="postgres_warehouse",
    sql="""
        SELECT 1
        FROM staging.load_log
        WHERE load_date = '{{ ds }}'::date
          AND status = 'COMPLETE'
        LIMIT 1;
    """,
    poke_interval=60,      # check every 60 seconds
    timeout=3600,          # fail the task after 1 hour
    mode="reschedule",     # release the worker slot between pokes
    dag=dag,
)

# Only runs after the sensor succeeds
load_events.set_upstream(wait_for_source_data)

Use mode="reschedule" rather than the default "poke" when your timeout is longer than a few minutes. In poke mode the worker process sleeps between checks and occupies a slot in your executor pool the entire time. Reschedule mode releases the slot between pokes and re-queues the sensor task, which scales dramatically better when you have many sensors running in parallel.

TaskFlow API and XComs for Passing Data Between Tasks

The @task decorator (introduced in Airflow 2.0) lets you write Python functions as tasks without subclassing operator classes. Return values are automatically serialised into XCom so downstream tasks can consume them as regular function arguments.

python
from airflow.decorators import dag, task
from datetime import datetime
import psycopg2

@dag(
    schedule_interval="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["backup-verification"],
)
def backup_verification():

    @task()
    def get_source_row_count(ds=None) -> int:
        """Query the production database for today's expected row count."""
        hook = PostgresHook(postgres_conn_id="postgres_production")
        result = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE order_date = %s", parameters=(ds,)
        )
        return result[0]

    @task()
    def get_backup_row_count(ds=None) -> int:
        """Query the backup replica to verify row counts match."""
        hook = PostgresHook(postgres_conn_id="postgres_backup_replica")
        result = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE order_date = %s", parameters=(ds,)
        )
        return result[0]

    @task()
    def assert_counts_match(source_count: int, backup_count: int):
        if source_count != backup_count:
            raise ValueError(
                f"Backup mismatch: source={source_count}, backup={backup_count}"
            )

    source = get_source_row_count()
    backup = get_backup_row_count()
    assert_counts_match(source, backup)

backup_verification_dag = backup_verification()

The TaskFlow API infers the dependency from the function call graph — assert_counts_match depends on both get_source_row_count and get_backup_row_count because their return values are passed as arguments. XCom values are stored in Airflow's metadata database, so keep them small — row counts, status strings, or file paths. Never push large dataframes through XCom; use S3, GCS, or a staging table instead.

Retry Logic and SLA Miss Callbacks

Database tasks are susceptible to transient failures: connection pool exhaustion, lock timeouts, or brief network blips to a managed RDS instance. Configure retries at the default_args level so every task in the DAG inherits sensible behaviour without repetition.

python
from airflow.models import DAG
from datetime import timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Alert the on-call channel when a task misses its SLA."""
    message = (
        f"SLA MISS in DAG '{dag.dag_id}': tasks {task_list} "
        f"exceeded their SLA window. Blocking tasks: {blocking_task_list}."
    )
    # Post to PagerDuty, Slack, or your alerting system here
    print(message)

default_args = {
    "owner": "data-engineering",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,   # 5m, 10m, 20m backoff
    "max_retry_delay": timedelta(hours=1),
    "email_on_failure": True,
    "email": ["dba-alerts@example.com"],
    "sla": timedelta(hours=2),           # task must complete within 2h of schedule
}

with DAG(
    dag_id="nightly_etl_with_sla",
    default_args=default_args,
    sla_miss_callback=sla_miss_callback,
    schedule_interval="0 2 * * *",
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:
    pass
Warning

SLA miss callbacks fire asynchronously from task execution — they are evaluated by the scheduler process on its next heartbeat after the SLA window expires, not at the exact moment of expiry. For hard real-time alerting, supplement SLA callbacks with external monitoring (Datadog, CloudWatch) that watches task state in Airflow's REST API.

Deployment: Kubernetes Executor vs CeleryExecutor

The executor controls how Airflow distributes task workload across compute resources. The two most common production choices are CeleryExecutor and KubernetesExecutor.

CeleryExecutor maintains a pool of long-running worker processes backed by a message broker (Redis or RabbitMQ). Workers are always on, so task startup latency is low — ideal for high-frequency DAGs or workloads that need a stable environment across many tasks. The downside is over-provisioning: you pay for idle worker capacity.

KubernetesExecutor spawns a fresh Pod for every task and tears it down when the task completes. This gives perfect isolation and pay-per-task scaling, but Pod startup adds 15–60 seconds of cold-start latency per task. For database maintenance jobs that run every few hours and take minutes to complete, that overhead is negligible. For pipelines with hundreds of short-running tasks per minute, it becomes significant.

A CeleryKubernetesExecutor hybrid is available in Airflow 2.x and lets you route long-running tasks to dedicated Celery workers while short-burst tasks get Kubernetes pods — a pragmatic middle ground for mixed workloads.

Managed Deployment: MWAA vs Astronomer vs Self-Hosted

Unless your team has deep Airflow operational expertise, a managed service is almost always the right first choice. Here is a concise comparison of the three main options.

Amazon MWAA (Managed Workflows for Apache Airflow) is the lowest-friction entry point for AWS-native shops. You get an Airflow environment in a VPC with IAM-based secrets integration and automatic upgrades. The tradeoff is that MWAA lags behind upstream Airflow releases by several months and gives you limited control over executor configuration. It works well for teams that want to stay entirely within AWS with minimal ops overhead.

Astronomer (cloud or self-hosted) is a commercial platform built around Airflow. It offers first-class CI/CD integration via Astro CLI, built-in observability through Astro Observe, and access to Astronomer's Cosmos library for dbt integration. It is the right choice when Airflow is a primary engineering surface — data teams that write dozens of DAGs and need tight deployment pipelines.

Self-hosted on Kubernetes via the official Helm chart gives maximum control and is free beyond infrastructure cost. It is appropriate for teams with strong Kubernetes expertise, strict data residency requirements, or unusual executor needs. Budget meaningful engineering time for upgrades, secret rotation, metadata database maintenance, and log storage management.

Tip

Regardless of deployment method, always run the Airflow metadata database (which stores DAG run state, task logs, and XComs) on a managed PostgreSQL instance with automated backups — not on the same host as the Airflow scheduler. Losing the metadata database means losing your entire DAG run history and all XCom values.

Common Database Patterns: ETL DAG and Backup Verification DAG

Two patterns cover the majority of database workflow use cases: a nightly ETL load and a backup verification job.

ETL DAG Pattern

python
with DAG("etl_events_to_warehouse", ...) as dag:

    wait_for_data = SqlSensor(
        task_id="wait_for_staging_complete",
        conn_id="postgres_staging",
        sql="SELECT 1 FROM load_log WHERE load_date = '{{ ds }}'::date AND status = 'DONE'",
        poke_interval=120,
        timeout=7200,
        mode="reschedule",
    )

    truncate_staging_target = SQLExecuteQueryOperator(
        task_id="truncate_staging_target",
        conn_id="postgres_warehouse",
        sql="TRUNCATE TABLE staging.events_inbound;",
    )

    load_to_staging = SQLExecuteQueryOperator(
        task_id="load_to_staging",
        conn_id="postgres_warehouse",
        sql="CALL staging.load_events_for_date('{{ ds }}');",
    )

    transform_and_merge = SQLExecuteQueryOperator(
        task_id="transform_and_merge",
        conn_id="postgres_warehouse",
        sql="CALL warehouse.merge_events_from_staging('{{ ds }}');",
    )

    validate_row_count = SqlSensor(
        task_id="validate_row_count",
        conn_id="postgres_warehouse",
        sql="""
            SELECT 1
            FROM warehouse.events
            WHERE event_date = '{{ ds }}'::date
            HAVING COUNT(*) > 0;
        """,
        poke_interval=10,
        timeout=120,
        mode="poke",
    )

    (
        wait_for_data
        >> truncate_staging_target
        >> load_to_staging
        >> transform_and_merge
        >> validate_row_count
    )

The bitshift operator (>>) defines task dependencies: each step must succeed before the next starts. A failure at any point halts the pipeline, retries according to your default_args, and pages your team if retries are exhausted — without leaving partial data silently committed to the warehouse.

Key Takeaways

Key Takeaways
  • Set catchup=False on all database DAGs by default. Only enable catchup when you explicitly need historical backfill and have verified your SQL is idempotent for repeated runs.
  • Use SQLExecuteQueryOperator (Airflow 2.5+) for new DAGs instead of PostgresOperator or MySqlOperator — it gives you a single operator class that works across database backends via the connection type.
  • Prefer mode="reschedule" on sensors with timeouts longer than 5 minutes to avoid locking executor worker slots during idle polling periods.
  • Keep XCom payloads small (counts, identifiers, file paths). Use intermediate tables or object storage for datasets that flow between tasks.
  • Apply retries, retry_delay, and sla in default_args so every task in the DAG inherits production-grade resilience without per-task repetition.
  • Run the Airflow metadata database on a dedicated managed PostgreSQL instance with automated backups — this database is the single source of truth for all your pipeline history and must be treated as production-critical infrastructure.
  • Choose MWAA for AWS-native simplicity, Astronomer for teams that treat Airflow as a primary engineering surface, and self-hosted Helm for maximum control when you have the Kubernetes expertise to maintain it.

Orchestrate Your Database Workflows with Confidence Using JusDB

Setting up Airflow correctly — connections, executors, metadata database tuning, log retention, alerting — takes real operational effort before your first production DAG ever runs. At JusDB we help data teams design and deploy Airflow architectures that are sized for their workload from day one: right executor choice, proper retry and SLA configuration, secure connection management, and CI/CD pipelines for DAG deployment.

Whether you are migrating brittle cron scripts to proper DAGs, scaling an existing Airflow environment that is struggling under production load, or evaluating MWAA versus Astronomer for a greenfield build, talk to the JusDB team. We can help you stop firefighting pipeline failures and start shipping reliable data workflows.

Share this article