Apache Airflow Tutorial: Build Production DAGs
Apache Airflow promises to tame your pipeline chaos — until you realize the framework itself has a learning curve that trips up most newcomers. Missed schedules, zombie tasks, circular dependencies: these aren't edge cases, they're rites of passage. This Apache Airflow tutorial walks you through building production-ready DAGs from scratch, covering the patterns that actually work and the mistakes that'll cost you hours of debugging.
Whether you're migrating from cron jobs or evaluating Airflow against newer orchestrators, this guide gives you runnable code and honest trade-offs.
What Apache Airflow Actually Is (and Isn't)
Apache Airflow is a workflow orchestration platform that lets you define, schedule, and monitor data pipelines as Python code. Originally built at Airbnb in 2014, it became an Apache top-level project and remains the most widely deployed orchestrator in data engineering.
What Airflow does well:
- Defines pipelines as directed acyclic graphs (DAGs) in Python
- Provides a rich web UI for monitoring, retries, and log inspection
- Supports 700+ community-maintained operator packages
- Handles dependency resolution, scheduling, and task distribution
What Airflow is not:
- A streaming engine — it's batch-first by design
- A data processing framework — it orchestrates work, it doesn't do the compute
- Simple — the operational overhead is real, especially self-hosted
| Dimension | Airflow | Cron + Scripts | Managed alternatives (e.g. MWAA, Cloud Composer) |
|---|---|---|---|
| Dependency management | Native DAG support | Manual / fragile | Native DAG support |
| Retry logic | Built-in with backoff | DIY | Built-in |
| Monitoring UI | Full web interface | None (logs only) | Full web interface |
| Setup complexity | Medium–High | Low | Low (managed) |
| Cost at scale | Infrastructure + ops | Minimal | Pay-per-use, can spike |
| Learning curve | Steep | Flat | Medium |
Installing Airflow Locally
The fastest way to get started is with a constrained pip install. Airflow has many dependencies, so version pinning matters.
# Create a virtual environment (Python 3.9–3.12 supported)
python3 -m venv airflow-venv
source airflow-venv/bin/activate
# Set Airflow home directory
export AIRFLOW_HOME=~/airflow
# Install Airflow 2.9+ with constraints for reproducibility
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Initialize the metadata database (SQLite for dev, PostgreSQL for prod)
airflow db migrate
# Create an admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
# Start the webserver (default: port 8080)
airflow webserver --port 8080 -D
# Start the scheduler (separate terminal or background)
airflow scheduler -D
Production note: Never use SQLite or the
SequentialExecutorin production. Use PostgreSQL +CeleryExecutororKubernetesExecutor. The default setup is for development only.
Your First DAG — Extract, Transform, Load
Here's a complete, runnable DAG that extracts data from a public API, transforms it with Python, and loads the result into a PostgreSQL table. This is the canonical Apache Airflow tutorial pattern.
# dags/weather_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
import requests
import json
# Default arguments applied to all tasks in the DAG
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
def _extract_weather(**context):
"""Extract weather data from Open-Meteo API (no API key required)."""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 52.52,
"longitude": 13.41,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"timezone": "Europe/Berlin",
"past_days": 7,
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Push to XCom for downstream tasks
context["ti"].xcom_push(key="weather_raw", value=data)
def _transform_weather(**context):
"""Transform raw API response into insert-ready records."""
raw = context["ti"].xcom_pull(task_ids="extract_weather", key="weather_raw")
daily = raw["daily"]
records = []
for i, date in enumerate(daily["time"]):
records.append({
"observation_date": date,
"temp_max_c": daily["temperature_2m_max"][i],
"temp_min_c": daily["temperature_2m_min"][i],
"precipitation_mm": daily["precipitation_sum"][i],
"city": "Berlin",
"loaded_at": datetime.utcnow().isoformat(),
})
context["ti"].xcom_push(key="weather_records", value=records)
def _load_weather(**context):
"""Insert transformed records into PostgreSQL."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
records = context["ti"].xcom_pull(
task_ids="transform_weather", key="weather_records"
)
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
# SQL dialect: PostgreSQL
insert_sql = """
INSERT INTO weather_daily
(observation_date, temp_max_c, temp_min_c, precipitation_mm, city, loaded_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (observation_date, city) DO UPDATE SET
temp_max_c = EXCLUDED.temp_max_c,
temp_min_c = EXCLUDED.temp_min_c,
precipitation_mm = EXCLUDED.precipitation_mm,
loaded_at = EXCLUDED.loaded_at;
"""
for record in records:
hook.run(insert_sql, parameters=(
record["observation_date"],
record["temp_max_c"],
record["temp_min_c"],
record["precipitation_mm"],
record["city"],
record["loaded_at"],
))
with DAG(
dag_id="weather_etl_pipeline",
default_args=default_args,
description="Daily weather ETL from Open-Meteo API to PostgreSQL",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "weather", "tutorial"],
) as dag:
# Sensor: check if the API is reachable before starting
check_api = HttpSensor(
task_id="check_api_available",
http_conn_id="open_meteo_api",
endpoint="v1/forecast?latitude=52.52&longitude=13.41&daily=temperature_2m_max",
response_check=lambda response: response.status_code == 200,
poke_interval=60,
timeout=300,
)
# Create target table if it doesn't exist (SQL dialect: PostgreSQL)
create_table = PostgresOperator(
task_id="create_table",
postgres_conn_id="postgres_warehouse",
sql="""
CREATE TABLE IF NOT EXISTS weather_daily (
observation_date DATE NOT NULL,
temp_max_c FLOAT,
temp_min_c FLOAT,
precipitation_mm FLOAT,
city VARCHAR(100) NOT NULL,
loaded_at TIMESTAMP,
PRIMARY KEY (observation_date, city)
);
""",
)
extract = PythonOperator(task_id="extract_weather", python_callable=_extract_weather)
transform = PythonOperator(task_id="transform_weather", python_callable=_transform_weather)
load = PythonOperator(task_id="load_weather", python_callable=_load_weather)
# Define task dependencies
check_api >> create_table >> extract >> transform >> load
Key Patterns in This DAG
- XCom for inter-task communication — small payloads only (< 48KB in the metadata DB). For large datasets, write to object storage and pass the path via XCom.
- Idempotent loads — the
ON CONFLICT ... DO UPDATEclause makes reruns safe. See our guide on idempotent data pipelines for deeper patterns. - Sensor before extract —
HttpSensorprevents wasted compute when the upstream API is down. - Exponential backoff retries —
retry_exponential_backoff=Trueprevents hammering a recovering service.
Scheduling and Catchup — The Most Misunderstood Part
Airflow's scheduler trips up almost everyone. Here's what you need to know:
start_date is not when the DAG first runs. It defines the earliest logical_date (formerly execution_date). A DAG scheduled @daily with start_date=2024-01-01 will have its first run after 2024-01-01 ends — so it triggers on 2024-01-02 at midnight, processing data for 2024-01-01.
catchup=True (the default) backfills all missed intervals. If you deploy a DAG with start_date three months ago and catchup=True, Airflow will queue ~90 DAG runs simultaneously. Set catchup=False unless you explicitly want backfill behavior.
# Common scheduling expressions
schedule="@daily" # Midnight UTC
schedule="@hourly" # Top of every hour
schedule="0 6 * * 1-5" # 6 AM UTC, weekdays only (cron syntax)
schedule="@weekly" # Sunday midnight
schedule=None # Triggered externally only (API, CLI, or another DAG)
Timetables (Airflow 2.4+) replace cron for complex schedules:
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.datasets import Dataset
# Run daily OR when an upstream dataset is updated
dag = DAG(
dag_id="hybrid_schedule",
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
datasets=[Dataset("s3://bucket/raw/events/")],
),
)
Apache Airflow Tutorial: TaskFlow API (The Modern Way)
Airflow 2.0+ introduced the TaskFlow API with @task decorators. It eliminates boilerplate and makes XCom implicit. Here's the same ETL pipeline rewritten:
# dags/weather_etl_taskflow.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
@dag(
dag_id="weather_etl_taskflow",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
},
tags=["etl", "weather", "taskflow"],
)
def weather_etl_taskflow():
@task()
def extract() -> dict:
"""Fetch weather data. Return value auto-pushed to XCom."""
response = requests.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": 52.52,
"longitude": 13.41,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"timezone": "Europe/Berlin",
"past_days": 7,
},
timeout=30,
)
response.raise_for_status()
return response.json()
@task()
def transform(raw: dict) -> list[dict]:
"""Transform raw data into records."""
daily = raw["daily"]
return [
{
"observation_date": daily["time"][i],
"temp_max_c": daily["temperature_2m_max"][i],
"temp_min_c": daily["temperature_2m_min"][i],
"precipitation_mm": daily["precipitation_sum"][i],
"city": "Berlin",
}
for i in range(len(daily["time"]))
]
@task()
def load(records: list[dict]):
"""Load records into PostgreSQL using a hook."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
# SQL dialect: PostgreSQL
for record in records:
hook.run(
"""INSERT INTO weather_daily
(observation_date, temp_max_c, temp_min_c, precipitation_mm, city)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (observation_date, city) DO UPDATE SET
temp_max_c = EXCLUDED.temp_max_c,
temp_min_c = EXCLUDED.temp_min_c,
precipitation_mm = EXCLUDED.precipitation_mm;""",
parameters=(
record["observation_date"],
record["temp_max_c"],
record["temp_min_c"],
record["precipitation_mm"],
record["city"],
),
)
# Implicit XCom: return values flow through function arguments
raw_data = extract()
transformed = transform(raw_data)
load(transformed)
# Instantiate the DAG
weather_etl_taskflow()
The TaskFlow API is cleaner, but the traditional operator approach still works and is better when you need operators with custom behavior (sensors, bash commands, provider-specific operators).
Connections and Variables — Managing Secrets
Never hardcode credentials in DAG files. Airflow provides two mechanisms:
Connections store endpoint + auth info (database URIs, API keys, cloud credentials). Set them via the UI, CLI, or environment variables:
# CLI approach
airflow connections add 'postgres_warehouse' \
--conn-type postgres \
--conn-host warehouse.example.com \
--conn-port 5432 \
--conn-schema analytics \
--conn-login etl_user \
--conn-password 'securepassword'
# Environment variable approach (overrides DB-stored connections)
export AIRFLOW_CONN_POSTGRES_WAREHOUSE='postgresql://etl_user:securepassword@warehouse.example.com:5432/analytics'
Variables store configuration values (thresholds, feature flags, file paths). Access them in DAGs via Variable.get() — but beware: each call queries the metadata database.
from airflow.models import Variable
# Bad: queries DB on every scheduler parse (every 30s by default)
threshold = Variable.get("quality_threshold")
# Good: access inside a task, where it runs once per execution
@task()
def check_quality():
threshold = float(Variable.get("quality_threshold", default_var="0.95"))
# ... validation logic
Common Mistakes and Pitfalls
These are the issues I see repeatedly in production Airflow deployments:
1. Top-Level Code in DAG Files
Airflow's scheduler parses all DAG files every 30 seconds (configurable via dag_dir_list_interval). Any code at the module level runs on every parse — including API calls, database queries, and heavy imports.
# BAD: this runs every 30 seconds during parsing
import pandas as pd
data = pd.read_csv("s3://bucket/large_file.csv") # Kills your scheduler
# GOOD: heavy work belongs inside tasks
@task()
def process_data():
import pandas as pd
data = pd.read_csv("s3://bucket/large_file.csv")
2. XCom Abuse
XCom serializes data into the metadata database (unless you configure a custom backend). Pushing DataFrames or large JSON blobs into XCom will bloat your database and eventually crash the scheduler.
Rule of thumb: XCom for metadata (paths, row counts, status flags). Object storage for data.
3. Ignoring Idempotency
If your DAG isn't idempotent, retries and backfills will produce duplicate or corrupt data. Every load task should use MERGE, INSERT ... ON CONFLICT, or truncate-and-reload patterns.
4. depends_on_past=True Without Understanding It
This flag makes each DAG run wait for the same task in the previous run to succeed. If one run fails, all future runs queue indefinitely. Use it only when sequential processing is genuinely required.
5. Not Setting max_active_runs
Without limits, catchup or external triggers can spawn hundreds of concurrent DAG runs, overwhelming your workers and metadata database.
with DAG(
dag_id="safe_dag",
max_active_runs=3, # Max 3 concurrent runs
max_active_tasks=10, # Max 10 tasks across all runs
# ...
) as dag:
pass
6. Monolithic DAGs
A single DAG with 200 tasks is hard to debug, slow to render in the UI, and creates scheduling bottlenecks. Split by domain or data source, and use data-aware scheduling to coordinate across DAGs.
Monitoring Your Airflow Pipelines
The Airflow UI shows task status, logs, and Gantt charts — but production monitoring requires more. Key metrics to track:
| Metric | What it tells you | Alert threshold |
|---|---|---|
| DAG parse time | Scheduler health | > 30 seconds |
| Task duration vs. SLA | Pipeline performance | > 2x historical average |
| Task failure rate | Data quality risk | > 5% over 24h |
| Scheduler heartbeat | Orchestrator liveness | Missing > 60s |
| XCom size | Metadata DB health | > 1MB per entry |
| DAG run queue depth | Capacity planning | > 50 queued runs |
For comprehensive pipeline observability beyond Airflow's built-in UI, check our guide on data pipeline monitoring which covers alerting strategies across multiple tools.
When Airflow Is NOT the Right Choice
Be honest about the trade-offs:
- Simple cron jobs: If you have 3 independent scripts with no dependencies, Airflow is overkill.
- Real-time / streaming: Airflow is batch-oriented. For sub-second latency, use Kafka, Flink, or Spark Structured Streaming. See our streaming vs batch comparison for guidance.
- Lightweight orchestration: If you want less operational overhead, consider managed alternatives like Dagster Cloud, Prefect Cloud, or even Databricks Workflows. They trade flexibility for simplicity.
- Teams without Python skills: Airflow is Python-native. If your team lives in SQL and drag-and-drop tools, the learning curve may not be worth it.
Exploring Pipeline Data Beyond Airflow
Once your Airflow DAGs are loading data into warehouses, the next question is always: how do I quickly validate and explore what landed? Harbinger Explorer lets you query data sources directly in the browser using DuckDB WASM and natural language — useful for spot-checking pipeline outputs or exploring API data before building a full DAG around it. The 7-day free trial is enough to prototype most validation workflows.
Next Steps
You've got a working Airflow setup and a production-ready DAG pattern. From here:
- Set up data quality testing — validate outputs after every pipeline run
- Explore the TaskFlow API documentation for dynamic task mapping and task groups
- Consider containerized data pipelines for isolated, reproducible task execution
The biggest lesson from running Airflow in production: invest in idempotency and monitoring before you have 50 DAGs. Retrofitting both is painful. Start with the patterns in this tutorial, and you'll avoid the worst surprises.
Continue Reading
Continue Reading
Apache Spark Tutorial: From Zero to Your First Data Pipeline
A hands-on Apache Spark tutorial covering core concepts, PySpark DataFrames, transformations, and real-world pipeline patterns for data engineers.
DuckDB Tutorial: Analytical SQL Directly in Your Browser
Get started with DuckDB in 15 minutes. Learn read_parquet, read_csv_auto, PIVOT, and when DuckDB beats SQLite and PostgreSQL for analytical SQL.
Excel to SQL: A Migration Guide for Business Analysts
Complete guide to Excel to SQL migration for business analysts. 25-row concept mapping table, SQL code examples, common pitfalls, and tips for making the switch stick.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial