TechLead
Lesson 20 of 22
6 min read
Data Engineering

Data Orchestration with Apache Airflow

Build and manage data pipeline workflows with Apache Airflow, including DAG design, operators, scheduling, and monitoring

What is Data Orchestration?

Data orchestration is the automated coordination, scheduling, and monitoring of data pipeline workflows. An orchestrator manages the sequence of tasks, handles dependencies between them, retries failures, and provides visibility into what is running, what has failed, and what is waiting. Apache Airflow is the most widely used open-source orchestration platform, deployed at thousands of companies from startups to Fortune 500 enterprises.

Without orchestration, data pipelines run as ad-hoc scripts or cron jobs with no dependency management, no retry logic, no alerting, and no visibility. Airflow solves these problems by providing a framework for defining workflows as code (Python), scheduling them on intervals, and monitoring them through a web UI.

Airflow Architecture

Core Components

  • DAG (Directed Acyclic Graph): A workflow defined as a Python file. Contains tasks and their dependencies. "Directed" means tasks flow in one direction; "Acyclic" means no circular dependencies.
  • Scheduler: Continuously monitors DAGs, triggers runs based on schedules, and assigns tasks to workers.
  • Executor: Determines how tasks are run — LocalExecutor (single machine), CeleryExecutor (distributed), or KubernetesExecutor (pods per task).
  • Web Server: Provides the UI for monitoring DAG runs, viewing logs, triggering manual runs, and managing connections.
  • Metadata Database: PostgreSQL or MySQL database storing DAG definitions, task states, variables, and connections.
  • Workers: Processes that execute tasks. In distributed mode, workers run on separate machines.

Writing Your First DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.dates import days_ago

# Default arguments applied to all tasks in the DAG
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# Define the DAG
with DAG(
    dag_id='daily_orders_pipeline',
    default_args=default_args,
    description='Extract, transform, and load daily order data',
    schedule_interval='0 6 * * *',  # Run at 6 AM daily
    start_date=datetime(2025, 1, 1),
    catchup=False,                  # Don't backfill missed runs
    max_active_runs=1,              # Only one run at a time
    tags=['orders', 'daily', 'production'],
) as dag:

    # Task 1: Extract data from source
    def extract_orders(**context):
        execution_date = context['ds']  # YYYY-MM-DD string
        print(f"Extracting orders for {execution_date}")
        # Extract logic here
        return {"rows_extracted": 15000}

    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
        provide_context=True,
    )

    # Task 2: Run dbt models
    transform = BashOperator(
        task_id='run_dbt_models',
        bash_command='cd /opt/dbt && dbt run --select tag:daily',
        env={'DBT_TARGET': 'prod'},
    )

    # Task 3: Run dbt tests
    test = BashOperator(
        task_id='run_dbt_tests',
        bash_command='cd /opt/dbt && dbt test --select tag:daily',
    )

    # Task 4: Run quality checks
    quality_check = SQLExecuteQueryOperator(
        task_id='run_quality_checks',
        conn_id='snowflake_prod',
        sql="""
            SELECT COUNT(*) AS orphan_count
            FROM analytics.fct_orders f
            LEFT JOIN analytics.dim_customer c ON f.customer_id = c.customer_id
            WHERE c.customer_id IS NULL
            AND f.order_date = '{{ ds }}'
            HAVING COUNT(*) > 0
        """,
    )

    # Task 5: Notify on success
    def send_notification(**context):
        print(f"Pipeline completed successfully for {context['ds']}")

    notify = PythonOperator(
        task_id='send_notification',
        python_callable=send_notification,
        provide_context=True,
    )

    # Define task dependencies
    extract >> transform >> test >> quality_check >> notify

Advanced DAG Patterns

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

with DAG(
    dag_id='advanced_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    default_args=default_args,
) as dag:

    start = EmptyOperator(task_id='start')

    # Task Groups: Organize related tasks visually
    with TaskGroup('extract_sources') as extract_group:
        extract_orders = PythonOperator(
            task_id='extract_orders',
            python_callable=lambda: print("Extracting orders"),
        )
        extract_customers = PythonOperator(
            task_id='extract_customers',
            python_callable=lambda: print("Extracting customers"),
        )
        extract_products = PythonOperator(
            task_id='extract_products',
            python_callable=lambda: print("Extracting products"),
        )

    # Branching: Choose path based on runtime conditions
    def choose_processing_path(**context):
        row_count = context['ti'].xcom_pull(task_ids='count_rows')
        if row_count and row_count > 1000000:
            return 'heavy_processing'
        return 'light_processing'

    branch = BranchPythonOperator(
        task_id='choose_path',
        python_callable=choose_processing_path,
        provide_context=True,
    )

    heavy = BashOperator(
        task_id='heavy_processing',
        bash_command='spark-submit /opt/spark/heavy_job.py {{ ds }}',
    )

    light = BashOperator(
        task_id='light_processing',
        bash_command='dbt run --select tag:daily',
    )

    end = EmptyOperator(
        task_id='end',
        trigger_rule='none_failed_min_one_success',
    )

    # DAG flow
    start >> extract_group >> branch
    branch >> [heavy, light] >> end

Airflow Best Practices

Production Best Practices

  • Idempotent Tasks: Every task should be safely re-runnable. Use partition overwrites or delete-then-insert patterns.
  • Atomic Tasks: Each task should do one thing. Do not combine extract + transform + load in a single task.
  • No Heavy Processing in DAG Files: DAG files are parsed frequently. Keep them lightweight — move business logic to separate modules.
  • Use Templating: Use Jinja templates ({{ ds }}, {{ data_interval_start }}) for dates instead of hardcoding or calling datetime.now().
  • Set Timeouts: Use execution_timeout on tasks and dagrun_timeout on DAGs to prevent hung processes.
  • Avoid Catchup for Most DAGs: Set catchup=False unless you specifically need historical backfilling.
  • Use Connections and Variables: Store credentials in Airflow Connections, not in code. Use Variables for runtime configuration.

Monitoring and Alerting

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def on_failure_callback(context):
    """Send Slack alert when a task fails."""
    task_instance = context['task_instance']
    dag_id = context['dag'].dag_id
    task_id = task_instance.task_id
    execution_date = context['ds']
    log_url = task_instance.log_url

    SlackWebhookOperator(
        task_id='slack_alert',
        slack_webhook_conn_id='slack_data_alerts',
        message=f"""
:red_circle: *Task Failed*
*DAG:* {dag_id}
*Task:* {task_id}
*Date:* {execution_date}
*Logs:* {log_url}
        """,
    ).execute(context)

# Apply to DAG
default_args['on_failure_callback'] = on_failure_callback

Airflow Sensors

Sensors are special operators that wait for a condition to be met before proceeding. They are essential for building pipelines that depend on external systems — waiting for a file to arrive, a partition to appear, or another DAG to complete.

from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor

# Wait for a file to appear in S3 before processing
wait_for_data = S3KeySensor(
    task_id='wait_for_orders_file',
    bucket_name='data-landing',
    bucket_key='orders/{{ ds }}/orders.parquet',
    aws_conn_id='aws_default',
    timeout=3600,           # Wait up to 1 hour
    poke_interval=60,       # Check every 60 seconds
    mode='reschedule',      # Free the worker while waiting
)

# Wait for another DAG to complete before starting
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_ingestion',
    external_dag_id='raw_data_ingestion',
    external_task_id='load_complete',
    timeout=7200,
    mode='reschedule',
)

# Flow: wait for data -> extract -> transform -> load
wait_for_data >> extract >> transform >> load

Airflow vs Alternatives

Tool Strength Best For
Apache AirflowMost widely adopted, large ecosystem, Python-nativeTeams with Python expertise and complex workflows
DagsterSoftware-defined assets, type-safe, excellent DXModern teams wanting asset-centric orchestration
PrefectSimple Python decorators, cloud-managed optionTeams wanting minimal boilerplate orchestration
dbt CloudBuilt-in scheduling for dbt-only workflowsTeams whose orchestration needs are limited to dbt runs

Deployment Patterns

# docker-compose.yml for Airflow with CeleryExecutor
version: '3.8'
services:
  airflow-webserver:
    image: apache/airflow:2.8.0
    command: webserver
    ports: ["8080:8080"]
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins

  airflow-scheduler:
    image: apache/airflow:2.8.0
    command: scheduler
    volumes:
      - ./dags:/opt/airflow/dags

  airflow-worker:
    image: apache/airflow:2.8.0
    command: celery worker
    volumes:
      - ./dags:/opt/airflow/dags

  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  redis:
    image: redis:7

Key Takeaways

  • Airflow orchestrates complex data workflows as DAGs (Directed Acyclic Graphs) defined in Python
  • Tasks represent individual steps; dependencies define execution order using the >> operator
  • Use Task Groups for visual organization and BranchPythonOperator for conditional execution paths
  • Sensors wait for external conditions (file arrival, upstream DAG completion) before proceeding
  • Every task should be idempotent and atomic — safely re-runnable and focused on a single responsibility
  • Use Jinja templates for dates, Connections for credentials, and Variables for configuration
  • Consider alternatives like Dagster or Prefect for new projects — they offer improved developer experience
  • Set up Slack or PagerDuty alerts for task failures — you should know about problems before stakeholders

Continue Learning