Cloud allgemein

Apache Airflow Tutorial 2026: Production-DAGs richtig bauen

Schritt-für-Schritt Apache Airflow Tutorial mit lauffähigen DAGs, TaskFlow-API, Scheduling-Patterns und den Production-Fallen, die jeden Anfänger einmal erwischen.

Harbinger Team13. April 202611 Min. LesezeitAktualisiert 14.5.2026
  • apache-airflow
  • orchestration
  • data-pipelines
  • etl
  • python
  • tutorial
  • workflow-automation
Inhaltsverzeichnis21 Abschnitte

Apache Airflow verspricht, dein Pipeline-Chaos zu zähmen — bis du merkst, dass das Framework selbst eine Lernkurve hat, die die meisten Neulinge stolpern lässt. Verpasste Schedules, Zombie-Tasks, zirkuläre Dependencies: das sind keine Edge-Cases, das sind Initiations-Riten. Dieses Apache Airflow Tutorial baut dich Schritt für Schritt durch production-ready DAGs — die Patterns, die wirklich funktionieren, und die Fehler, die dich Stunden Debugging kosten werden.

Egal ob du von Cron-Jobs migrierst oder Airflow gegen neuere Orchestratoren evaluierst — du bekommst lauffähigen Code und ehrliche Trade-offs.

TL;DR

  • Setup: pip install apache-airflow==2.9.3 mit Constraints-File, dann airflow db migrate + Webserver + Scheduler starten.
  • Production-DAG: XCom für kleine Payloads, Object-Storage für Daten, ON CONFLICT DO UPDATE für idempotente Loads.
  • Scheduling-Falle: catchup=False setzen, sonst werden 90 Runs gleichzeitig in der Queue.
  • Top-Fehler: Code im Module-Scope (Scheduler killt), XCom-Missbrauch, fehlende max_active_runs.
  • TaskFlow-API (2.0+): Decorator-basiert, kleinere Code-Base, XCom implizit.

Was Apache Airflow ist (und nicht ist)

Apache Airflow ist eine Workflow-Orchestrierungs-Plattform, mit der du Daten-Pipelines als Python-Code definierst, scheduled und monitorst. Ursprünglich 2014 bei Airbnb gebaut, wurde es ein Apache-Top-Level-Projekt und bleibt der meistdeployte Orchestrator im Data-Engineering.

Was Airflow gut macht:

  • Definiert Pipelines als gerichtete azyklische Graphen (DAGs) in Python
  • Bietet eine reichhaltige Web-UI für Monitoring, Retries, Log-Inspection
  • Unterstützt 700+ Community-gepflegte Operator-Pakete
  • Handlet Dependency-Auflösung, Scheduling und Task-Verteilung

Was Airflow nicht ist:

  • Eine Streaming-Engine — Batch-first by design
  • Ein Data-Processing-Framework — orchestriert Arbeit, macht selbst kein Compute
  • Einfach — der operationale Overhead ist real, besonders self-hosted
DimensionAirflowCron + ScriptsManaged-Alternativen (MWAA, Composer)
Dependency-ManagementNative DAG-SupportManuell / fragilNative DAG-Support
Retry-LogikEingebaut mit BackoffDIYEingebaut
Monitoring-UIVolles Web-InterfaceKeine (nur Logs)Volles Web-Interface
Setup-KomplexitätMittel–HochNiedrigNiedrig (managed)
Kosten at ScaleInfra + OpsMinimalPay-per-use, kann spike-en
LernkurveSteilFlachMittel

Airflow lokal installieren

Der schnellste Weg: constrained pip-Install. Airflow hat viele Dependencies, deshalb zählen gepinnte Versionen.

# Virtual Environment (Python 3.9–3.12)
python3 -m venv airflow-venv
source airflow-venv/bin/activate

# Airflow-Home-Verzeichnis setzen
export AIRFLOW_HOME=~/airflow

# Airflow 2.9+ mit Constraints für Reproduzierbarkeit
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Metadata-DB initialisieren (SQLite für Dev, PostgreSQL für Prod)
airflow db migrate

# Admin-User anlegen
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

# Webserver starten (Default: Port 8080)
airflow webserver --port 8080 -D

# Scheduler starten (eigenes Terminal oder Background)
airflow scheduler -D

Production-Hinweis: Niemals SQLite oder den SequentialExecutor in Production. Nutze PostgreSQL + CeleryExecutor oder KubernetesExecutor. Das Default-Setup ist nur für Dev.

Dein erster DAG — Extract, Transform, Load

Hier ist ein vollständiger, lauffähiger DAG, der Daten aus einer public API extrahiert, mit Python transformiert und das Ergebnis in PostgreSQL lädt. Das ist das kanonische Airflow-Tutorial-Pattern.

# dags/weather_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
import requests
import json

# Default-Argumente für alle Tasks im DAG
default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
}

def _extract_weather(**context):
    """Wetterdaten von Open-Meteo API extrahieren (kein API-Key nötig)."""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": 52.52,
        "longitude": 13.41,
        "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
        "timezone": "Europe/Berlin",
        "past_days": 7,
    }
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    # Push auf XCom für Downstream-Tasks
    context["ti"].xcom_push(key="weather_raw", value=data)

def _transform_weather(**context):
    """Rohe API-Response in Insert-Ready-Records umwandeln."""
    raw = context["ti"].xcom_pull(task_ids="extract_weather", key="weather_raw")
    daily = raw["daily"]
    records = []
    for i, date in enumerate(daily["time"]):
        records.append({
            "observation_date": date,
            "temp_max_c": daily["temperature_2m_max"][i],
            "temp_min_c": daily["temperature_2m_min"][i],
            "precipitation_mm": daily["precipitation_sum"][i],
            "city": "Berlin",
            "loaded_at": datetime.utcnow().isoformat(),
        })
    context["ti"].xcom_push(key="weather_records", value=records)

def _load_weather(**context):
    """Transformierte Records in PostgreSQL inserten."""
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    records = context["ti"].xcom_pull(
        task_ids="transform_weather", key="weather_records"
    )
    hook = PostgresHook(postgres_conn_id="postgres_warehouse")
    # SQL-Dialekt: PostgreSQL
    insert_sql = """
        INSERT INTO weather_daily
            (observation_date, temp_max_c, temp_min_c, precipitation_mm, city, loaded_at)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (observation_date, city) DO UPDATE SET
            temp_max_c = EXCLUDED.temp_max_c,
            temp_min_c = EXCLUDED.temp_min_c,
            precipitation_mm = EXCLUDED.precipitation_mm,
            loaded_at = EXCLUDED.loaded_at;
    """
    for record in records:
        hook.run(insert_sql, parameters=(
            record["observation_date"],
            record["temp_max_c"],
            record["temp_min_c"],
            record["precipitation_mm"],
            record["city"],
            record["loaded_at"],
        ))

with DAG(
    dag_id="weather_etl_pipeline",
    default_args=default_args,
    description="Daily weather ETL von Open-Meteo API nach PostgreSQL",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["etl", "weather", "tutorial"],
) as dag:

    # Sensor: prüft, ob API erreichbar ist, bevor wir starten
    check_api = HttpSensor(
        task_id="check_api_available",
        http_conn_id="open_meteo_api",
        endpoint="v1/forecast?latitude=52.52&longitude=13.41&daily=temperature_2m_max",
        response_check=lambda response: response.status_code == 200,
        poke_interval=60,
        timeout=300,
    )

    # Zieltabelle anlegen wenn nicht da (SQL-Dialekt: PostgreSQL)
    create_table = PostgresOperator(
        task_id="create_table",
        postgres_conn_id="postgres_warehouse",
        sql="""
            CREATE TABLE IF NOT EXISTS weather_daily (
                observation_date DATE NOT NULL,
                temp_max_c FLOAT,
                temp_min_c FLOAT,
                precipitation_mm FLOAT,
                city VARCHAR(100) NOT NULL,
                loaded_at TIMESTAMP,
                PRIMARY KEY (observation_date, city)
            );
        """,
    )

    extract = PythonOperator(task_id="extract_weather", python_callable=_extract_weather)
    transform = PythonOperator(task_id="transform_weather", python_callable=_transform_weather)
    load = PythonOperator(task_id="load_weather", python_callable=_load_weather)

    # Task-Dependencies
    check_api >> create_table >> extract >> transform >> load

Key-Patterns in diesem DAG

  1. XCom für Inter-Task-Kommunikation — nur kleine Payloads (< 48KB in der Metadata-DB). Für große Datasets: in Object-Storage schreiben und den Pfad via XCom weitergeben.
  2. Idempotente LoadsON CONFLICT ... DO UPDATE macht Re-Runs sicher.
  3. Sensor vor ExtractHttpSensor verhindert verschwendetes Compute, wenn die Upstream-API down ist.
  4. Exponential-Backoff-Retriesretry_exponential_backoff=True verhindert, dass du einen sich erholenden Service kaputt hämmerst.

Scheduling und Catchup — der missverstandenste Teil

Airflows Scheduler verwirrt fast jeden. Was du wissen musst:

start_date ist nicht, wann der DAG zum ersten Mal läuft. Es definiert das früheste logical_date (früher execution_date). Ein DAG mit @daily und start_date=2024-01-01 läuft das erste Mal nachdem 2024-01-01 endet — er triggered am 2024-01-02 um Mitternacht und verarbeitet Daten für 2024-01-01.

catchup=True (der Default) backfilled alle verpassten Intervalle. Wenn du einen DAG mit start_date vor drei Monaten und catchup=True deployst, queued Airflow ~90 DAG-Runs gleichzeitig. Setz catchup=False, außer du willst explizit Backfill-Verhalten.

# Gängige Scheduling-Ausdrücke
schedule="@daily"          # Mitternacht UTC
schedule="@hourly"         # Volle Stunde
schedule="0 6 * * 1-5"    # 6 Uhr UTC, Werktage (Cron)
schedule="@weekly"         # Sonntag Mitternacht
schedule=None              # Nur extern getriggert (API, CLI, anderer DAG)

Timetables (Airflow 2.4+) ersetzen Cron für komplexe Schedules:

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.datasets import Dataset

# Täglich ODER wenn ein Upstream-Dataset aktualisiert wird
dag = DAG(
    dag_id="hybrid_schedule",
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
        datasets=[Dataset("s3://bucket/raw/events/")],
    ),
)

TaskFlow-API (der moderne Weg)

Airflow 2.0+ hat die TaskFlow-API mit @task-Decorators eingeführt. Sie eliminiert Boilerplate und macht XCom implizit. Hier dieselbe ETL-Pipeline neu geschrieben:

# dags/weather_etl_taskflow.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests

@dag(
    dag_id="weather_etl_taskflow",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
    },
    tags=["etl", "weather", "taskflow"],
)
def weather_etl_taskflow():

    @task()
    def extract() -> dict:
        """Wetterdaten holen. Return-Value wird automatisch auf XCom gepushed."""
        response = requests.get(
            "https://api.open-meteo.com/v1/forecast",
            params={
                "latitude": 52.52,
                "longitude": 13.41,
                "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
                "timezone": "Europe/Berlin",
                "past_days": 7,
            },
            timeout=30,
        )
        response.raise_for_status()
        return response.json()

    @task()
    def transform(raw: dict) -> list[dict]:
        """Rohdaten in Records transformieren."""
        daily = raw["daily"]
        return [
            {
                "observation_date": daily["time"][i],
                "temp_max_c": daily["temperature_2m_max"][i],
                "temp_min_c": daily["temperature_2m_min"][i],
                "precipitation_mm": daily["precipitation_sum"][i],
                "city": "Berlin",
            }
            for i in range(len(daily["time"]))
        ]

    @task()
    def load(records: list[dict]):
        """Records in PostgreSQL laden via Hook."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id="postgres_warehouse")
        for record in records:
            hook.run(
                """INSERT INTO weather_daily
                       (observation_date, temp_max_c, temp_min_c, precipitation_mm, city)
                   VALUES (%s, %s, %s, %s, %s)
                   ON CONFLICT (observation_date, city) DO UPDATE SET
                       temp_max_c = EXCLUDED.temp_max_c,
                       temp_min_c = EXCLUDED.temp_min_c,
                       precipitation_mm = EXCLUDED.precipitation_mm;""",
                parameters=(
                    record["observation_date"],
                    record["temp_max_c"],
                    record["temp_min_c"],
                    record["precipitation_mm"],
                    record["city"],
                ),
            )

    # Impliziter XCom: Return-Values fließen durch Funktions-Argumente
    raw_data = extract()
    transformed = transform(raw_data)
    load(transformed)

weather_etl_taskflow()

Die TaskFlow-API ist sauberer, aber der klassische Operator-Ansatz funktioniert weiter und ist besser, wenn du Operators mit Custom-Verhalten brauchst (Sensors, Bash-Commands, Provider-spezifische Operators).

Connections und Variables — Secrets-Management

Niemals Credentials hard-coded in DAG-Files. Airflow bietet zwei Mechanismen:

Connections speichern Endpoint + Auth (DB-URIs, API-Keys, Cloud-Credentials). Setzbar via UI, CLI oder Environment-Variablen:

# CLI-Ansatz
airflow connections add 'postgres_warehouse' \
    --conn-type postgres \
    --conn-host warehouse.example.com \
    --conn-port 5432 \
    --conn-schema analytics \
    --conn-login etl_user \
    --conn-password 'securepassword'

# ENV-Ansatz (überschreibt DB-stored Connections)
export AIRFLOW_CONN_POSTGRES_WAREHOUSE='postgresql://etl_user:securepassword@warehouse.example.com:5432/analytics'

Variables speichern Config-Werte (Thresholds, Feature-Flags, Pfade). Zugriff über Variable.get() — aber Achtung: jeder Call queried die Metadata-DB.

from airflow.models import Variable

# Schlecht: queried die DB bei jedem Scheduler-Parse (alle 30s by default)
threshold = Variable.get("quality_threshold")

# Gut: Zugriff innerhalb eines Tasks, läuft einmal pro Execution
@task()
def check_quality():
    threshold = float(Variable.get("quality_threshold", default_var="0.95"))
    # ... Validation-Logik

Häufige Fehler und Fallen

Das sind die Issues, die ich in Production-Airflow immer wieder sehe:

1. Top-Level-Code in DAG-Files

Airflows Scheduler parsed alle DAG-Files alle 30 Sekunden (konfigurierbar via dag_dir_list_interval). Jeder Code auf Module-Ebene läuft bei jedem Parse — inklusive API-Calls, DB-Queries, schwerer Imports.

# SCHLECHT: läuft alle 30 Sekunden während des Parsings
import pandas as pd
data = pd.read_csv("s3://bucket/large_file.csv")  # Killt deinen Scheduler

# GUT: schwere Arbeit gehört in Tasks
@task()
def process_data():
    import pandas as pd
    data = pd.read_csv("s3://bucket/large_file.csv")

2. XCom-Missbrauch

XCom serialisiert Daten in die Metadata-DB (außer du konfigurierst ein Custom-Backend). DataFrames oder große JSON-Blobs in XCom zu pushen wird deine DB aufblähen und den Scheduler crashen.

Faustregel: XCom für Metadata (Pfade, Row-Counts, Status-Flags). Object-Storage für Daten.

3. Idempotenz ignorieren

Wenn dein DAG nicht idempotent ist, produzieren Retries und Backfills Duplikate oder korrupte Daten. Jeder Load-Task sollte MERGE, INSERT ... ON CONFLICT oder Truncate-and-Reload-Patterns nutzen.

4. depends_on_past=True ohne Verständnis

Dieses Flag macht, dass jeder DAG-Run auf den gleichen Task im vorherigen Run wartet. Wenn ein Run failed, queued alle zukünftigen Runs ewig. Nur nutzen, wenn sequenzielles Processing wirklich nötig ist.

5. max_active_runs nicht setzen

Ohne Limits können Catchup oder externe Trigger hunderte gleichzeitiger DAG-Runs spawnen und deine Worker + Metadata-DB überfordern.

with DAG(
    dag_id="safe_dag",
    max_active_runs=3,         # Max 3 gleichzeitige Runs
    max_active_tasks=10,       # Max 10 Tasks über alle Runs
    # ...
) as dag:
    pass

6. Monolithische DAGs

Ein einzelner DAG mit 200 Tasks ist schwer zu debuggen, langsam in der UI und produziert Scheduling-Bottlenecks. Splitten nach Domain oder Data-Source, und data-aware Scheduling für DAG-übergreifende Koordination nutzen.

Monitoring deiner Airflow-Pipelines

Die Airflow-UI zeigt Task-Status, Logs und Gantt-Charts — aber Production-Monitoring braucht mehr. Wichtige Metriken:

MetrikWas sie aussagtAlert-Threshold
DAG-Parse-ZeitScheduler-Gesundheit> 30 Sekunden
Task-Dauer vs. SLAPipeline-Performance> 2× historischer Schnitt
Task-Failure-RateDatenqualitäts-Risiko> 5% über 24h
Scheduler-HeartbeatOrchestrator-LivenessFehlt > 60s
XCom-SizeMetadata-DB-Gesundheit> 1MB pro Entry
DAG-Run-Queue-TiefeCapacity-Planning> 50 queued Runs

Für umfassende Pipeline-Observability über die Airflow-UI hinaus: Data Pipeline Monitoring.

Wann Airflow NICHT die richtige Wahl ist

Ehrlich zu den Trade-offs sein:

  • Einfache Cron-Jobs: 3 unabhängige Scripts ohne Dependencies — Airflow ist Overkill.
  • Echtzeit / Streaming: Airflow ist Batch-orientiert. Für Sub-Sekunden-Latenz: Kafka, Flink oder Spark Structured Streaming.
  • Leichte Orchestrierung: Weniger Ops-Overhead? Managed-Alternativen wie Dagster Cloud, Prefect Cloud oder Databricks Workflows.
  • Teams ohne Python-Skills: Airflow ist Python-nativ. Wenn dein Team in SQL und Drag-and-Drop-Tools lebt, lohnt die Lernkurve evtl. nicht.

Pipeline-Daten jenseits von Airflow erkunden

Sobald deine Airflow-DAGs Daten in Warehouses laden, ist die nächste Frage immer: Wie validiere und erkunde ich schnell, was da angekommen ist? Harbinger Explorer lässt dich Datenquellen direkt im Browser via DuckDB WASM und natürlicher Sprache abfragen — nützlich für Spot-Checks von Pipeline-Outputs oder API-Exploration, bevor du einen vollen DAG drum baust. Der 7-Tage-Test reicht für die meisten Validierungs-Workflows.

FAQ

Welche Python-Version unterstützt Airflow 2.9? 3.9 bis 3.12. 3.8 ist deprecated in 2.9+.

SQLite vs PostgreSQL für die Metadata-DB? SQLite nur für lokale Dev. PostgreSQL (oder MySQL) für jede produktive Nutzung — SQLite serialisiert alle Writes, Scheduler-Performance bricht ein.

Was ist der Unterschied zwischen TaskFlow und klassischen Operators? TaskFlow ist Decorator-basiert (@task), Return-Values fließen automatisch durch XCom. Klassische Operators sind ausdrücklicher, aber Boilerplate-heavy. Für reine Python-Tasks: TaskFlow. Für vordefinierte Operators (Sensors, S3, Postgres): klassisch.

Wie viele DAGs verträgt ein Standard-Scheduler? Faustregel: 200-500 DAGs auf einem Scheduler mit anständig CPU. Darüber: HA-Scheduler + Parse-Optimierung (kein Top-Level-Code, DAG-Bundling).

Nächste Schritte

Du hast ein lauffähiges Airflow-Setup und ein production-ready DAG-Pattern. Von hier:

  1. Data-Quality-Testing aufsetzen — Outputs nach jedem Pipeline-Run validieren
  2. TaskFlow-API-Doku für dynamisches Task-Mapping und Task-Groups
  3. Containerized Data Pipelines für isolierte, reproduzierbare Task-Execution

Die größte Lektion aus Production-Airflow: in Idempotenz und Monitoring investieren bevor du 50 DAGs hast. Beides nachzurüsten ist schmerzhaft. Mit den Patterns in diesem Tutorial vermeidest du die schlimmsten Überraschungen.

Weiterlesen

Stand: 14. Mai 2026. Airflow-Versionen ändern sich — Constraints-URLs auf der offiziellen Airflow-Doku checken.

H

Geschrieben von

Harbinger Team

Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastruktur­kritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.

Hat dir das geholfen?

Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.

Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.