Inhaltsverzeichnis21 Abschnitte
- TL;DR
- Was Apache Airflow ist (und nicht ist)
- Airflow lokal installieren
- Dein erster DAG — Extract, Transform, Load
- Key-Patterns in diesem DAG
- Scheduling und Catchup — der missverstandenste Teil
- TaskFlow-API (der moderne Weg)
- Connections und Variables — Secrets-Management
- Häufige Fehler und Fallen
- 1. Top-Level-Code in DAG-Files
- 2. XCom-Missbrauch
- 3. Idempotenz ignorieren
- 4. dependsonpast=True ohne Verständnis
- 5. maxactiveruns nicht setzen
- 6. Monolithische DAGs
- Monitoring deiner Airflow-Pipelines
- Wann Airflow NICHT die richtige Wahl ist
- Pipeline-Daten jenseits von Airflow erkunden
- FAQ
- Nächste Schritte
- Weiterlesen
Apache Airflow verspricht, dein Pipeline-Chaos zu zähmen — bis du merkst, dass das Framework selbst eine Lernkurve hat, die die meisten Neulinge stolpern lässt. Verpasste Schedules, Zombie-Tasks, zirkuläre Dependencies: das sind keine Edge-Cases, das sind Initiations-Riten. Dieses Apache Airflow Tutorial baut dich Schritt für Schritt durch production-ready DAGs — die Patterns, die wirklich funktionieren, und die Fehler, die dich Stunden Debugging kosten werden.
Egal ob du von Cron-Jobs migrierst oder Airflow gegen neuere Orchestratoren evaluierst — du bekommst lauffähigen Code und ehrliche Trade-offs.
TL;DR
- Setup:
pip install apache-airflow==2.9.3mit Constraints-File, dannairflow db migrate+ Webserver + Scheduler starten. - Production-DAG: XCom für kleine Payloads, Object-Storage für Daten,
ON CONFLICT DO UPDATEfür idempotente Loads. - Scheduling-Falle:
catchup=Falsesetzen, sonst werden 90 Runs gleichzeitig in der Queue. - Top-Fehler: Code im Module-Scope (Scheduler killt), XCom-Missbrauch, fehlende
max_active_runs. - TaskFlow-API (2.0+): Decorator-basiert, kleinere Code-Base, XCom implizit.
Was Apache Airflow ist (und nicht ist)
Apache Airflow ist eine Workflow-Orchestrierungs-Plattform, mit der du Daten-Pipelines als Python-Code definierst, scheduled und monitorst. Ursprünglich 2014 bei Airbnb gebaut, wurde es ein Apache-Top-Level-Projekt und bleibt der meistdeployte Orchestrator im Data-Engineering.
Was Airflow gut macht:
- Definiert Pipelines als gerichtete azyklische Graphen (DAGs) in Python
- Bietet eine reichhaltige Web-UI für Monitoring, Retries, Log-Inspection
- Unterstützt 700+ Community-gepflegte Operator-Pakete
- Handlet Dependency-Auflösung, Scheduling und Task-Verteilung
Was Airflow nicht ist:
- Eine Streaming-Engine — Batch-first by design
- Ein Data-Processing-Framework — orchestriert Arbeit, macht selbst kein Compute
- Einfach — der operationale Overhead ist real, besonders self-hosted
| Dimension | Airflow | Cron + Scripts | Managed-Alternativen (MWAA, Composer) |
|---|---|---|---|
| Dependency-Management | Native DAG-Support | Manuell / fragil | Native DAG-Support |
| Retry-Logik | Eingebaut mit Backoff | DIY | Eingebaut |
| Monitoring-UI | Volles Web-Interface | Keine (nur Logs) | Volles Web-Interface |
| Setup-Komplexität | Mittel–Hoch | Niedrig | Niedrig (managed) |
| Kosten at Scale | Infra + Ops | Minimal | Pay-per-use, kann spike-en |
| Lernkurve | Steil | Flach | Mittel |
Airflow lokal installieren
Der schnellste Weg: constrained pip-Install. Airflow hat viele Dependencies, deshalb zählen gepinnte Versionen.
# Virtual Environment (Python 3.9–3.12)
python3 -m venv airflow-venv
source airflow-venv/bin/activate
# Airflow-Home-Verzeichnis setzen
export AIRFLOW_HOME=~/airflow
# Airflow 2.9+ mit Constraints für Reproduzierbarkeit
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Metadata-DB initialisieren (SQLite für Dev, PostgreSQL für Prod)
airflow db migrate
# Admin-User anlegen
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
# Webserver starten (Default: Port 8080)
airflow webserver --port 8080 -D
# Scheduler starten (eigenes Terminal oder Background)
airflow scheduler -D
Production-Hinweis: Niemals SQLite oder den
SequentialExecutorin Production. Nutze PostgreSQL +CeleryExecutoroderKubernetesExecutor. Das Default-Setup ist nur für Dev.
Dein erster DAG — Extract, Transform, Load
Hier ist ein vollständiger, lauffähiger DAG, der Daten aus einer public API extrahiert, mit Python transformiert und das Ergebnis in PostgreSQL lädt. Das ist das kanonische Airflow-Tutorial-Pattern.
# dags/weather_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.sensors.http import HttpSensor
import requests
import json
# Default-Argumente für alle Tasks im DAG
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
def _extract_weather(**context):
"""Wetterdaten von Open-Meteo API extrahieren (kein API-Key nötig)."""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 52.52,
"longitude": 13.41,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"timezone": "Europe/Berlin",
"past_days": 7,
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Push auf XCom für Downstream-Tasks
context["ti"].xcom_push(key="weather_raw", value=data)
def _transform_weather(**context):
"""Rohe API-Response in Insert-Ready-Records umwandeln."""
raw = context["ti"].xcom_pull(task_ids="extract_weather", key="weather_raw")
daily = raw["daily"]
records = []
for i, date in enumerate(daily["time"]):
records.append({
"observation_date": date,
"temp_max_c": daily["temperature_2m_max"][i],
"temp_min_c": daily["temperature_2m_min"][i],
"precipitation_mm": daily["precipitation_sum"][i],
"city": "Berlin",
"loaded_at": datetime.utcnow().isoformat(),
})
context["ti"].xcom_push(key="weather_records", value=records)
def _load_weather(**context):
"""Transformierte Records in PostgreSQL inserten."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
records = context["ti"].xcom_pull(
task_ids="transform_weather", key="weather_records"
)
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
# SQL-Dialekt: PostgreSQL
insert_sql = """
INSERT INTO weather_daily
(observation_date, temp_max_c, temp_min_c, precipitation_mm, city, loaded_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (observation_date, city) DO UPDATE SET
temp_max_c = EXCLUDED.temp_max_c,
temp_min_c = EXCLUDED.temp_min_c,
precipitation_mm = EXCLUDED.precipitation_mm,
loaded_at = EXCLUDED.loaded_at;
"""
for record in records:
hook.run(insert_sql, parameters=(
record["observation_date"],
record["temp_max_c"],
record["temp_min_c"],
record["precipitation_mm"],
record["city"],
record["loaded_at"],
))
with DAG(
dag_id="weather_etl_pipeline",
default_args=default_args,
description="Daily weather ETL von Open-Meteo API nach PostgreSQL",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "weather", "tutorial"],
) as dag:
# Sensor: prüft, ob API erreichbar ist, bevor wir starten
check_api = HttpSensor(
task_id="check_api_available",
http_conn_id="open_meteo_api",
endpoint="v1/forecast?latitude=52.52&longitude=13.41&daily=temperature_2m_max",
response_check=lambda response: response.status_code == 200,
poke_interval=60,
timeout=300,
)
# Zieltabelle anlegen wenn nicht da (SQL-Dialekt: PostgreSQL)
create_table = PostgresOperator(
task_id="create_table",
postgres_conn_id="postgres_warehouse",
sql="""
CREATE TABLE IF NOT EXISTS weather_daily (
observation_date DATE NOT NULL,
temp_max_c FLOAT,
temp_min_c FLOAT,
precipitation_mm FLOAT,
city VARCHAR(100) NOT NULL,
loaded_at TIMESTAMP,
PRIMARY KEY (observation_date, city)
);
""",
)
extract = PythonOperator(task_id="extract_weather", python_callable=_extract_weather)
transform = PythonOperator(task_id="transform_weather", python_callable=_transform_weather)
load = PythonOperator(task_id="load_weather", python_callable=_load_weather)
# Task-Dependencies
check_api >> create_table >> extract >> transform >> load
Key-Patterns in diesem DAG
- XCom für Inter-Task-Kommunikation — nur kleine Payloads (< 48KB in der Metadata-DB). Für große Datasets: in Object-Storage schreiben und den Pfad via XCom weitergeben.
- Idempotente Loads —
ON CONFLICT ... DO UPDATEmacht Re-Runs sicher. - Sensor vor Extract —
HttpSensorverhindert verschwendetes Compute, wenn die Upstream-API down ist. - Exponential-Backoff-Retries —
retry_exponential_backoff=Trueverhindert, dass du einen sich erholenden Service kaputt hämmerst.
Scheduling und Catchup — der missverstandenste Teil
Airflows Scheduler verwirrt fast jeden. Was du wissen musst:
start_date ist nicht, wann der DAG zum ersten Mal läuft. Es definiert das früheste logical_date (früher execution_date). Ein DAG mit @daily und start_date=2024-01-01 läuft das erste Mal nachdem 2024-01-01 endet — er triggered am 2024-01-02 um Mitternacht und verarbeitet Daten für 2024-01-01.
catchup=True (der Default) backfilled alle verpassten Intervalle. Wenn du einen DAG mit start_date vor drei Monaten und catchup=True deployst, queued Airflow ~90 DAG-Runs gleichzeitig. Setz catchup=False, außer du willst explizit Backfill-Verhalten.
# Gängige Scheduling-Ausdrücke
schedule="@daily" # Mitternacht UTC
schedule="@hourly" # Volle Stunde
schedule="0 6 * * 1-5" # 6 Uhr UTC, Werktage (Cron)
schedule="@weekly" # Sonntag Mitternacht
schedule=None # Nur extern getriggert (API, CLI, anderer DAG)
Timetables (Airflow 2.4+) ersetzen Cron für komplexe Schedules:
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.datasets import Dataset
# Täglich ODER wenn ein Upstream-Dataset aktualisiert wird
dag = DAG(
dag_id="hybrid_schedule",
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
datasets=[Dataset("s3://bucket/raw/events/")],
),
)
TaskFlow-API (der moderne Weg)
Airflow 2.0+ hat die TaskFlow-API mit @task-Decorators eingeführt. Sie eliminiert Boilerplate und macht XCom implizit. Hier dieselbe ETL-Pipeline neu geschrieben:
# dags/weather_etl_taskflow.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
@dag(
dag_id="weather_etl_taskflow",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
},
tags=["etl", "weather", "taskflow"],
)
def weather_etl_taskflow():
@task()
def extract() -> dict:
"""Wetterdaten holen. Return-Value wird automatisch auf XCom gepushed."""
response = requests.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": 52.52,
"longitude": 13.41,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"timezone": "Europe/Berlin",
"past_days": 7,
},
timeout=30,
)
response.raise_for_status()
return response.json()
@task()
def transform(raw: dict) -> list[dict]:
"""Rohdaten in Records transformieren."""
daily = raw["daily"]
return [
{
"observation_date": daily["time"][i],
"temp_max_c": daily["temperature_2m_max"][i],
"temp_min_c": daily["temperature_2m_min"][i],
"precipitation_mm": daily["precipitation_sum"][i],
"city": "Berlin",
}
for i in range(len(daily["time"]))
]
@task()
def load(records: list[dict]):
"""Records in PostgreSQL laden via Hook."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
for record in records:
hook.run(
"""INSERT INTO weather_daily
(observation_date, temp_max_c, temp_min_c, precipitation_mm, city)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (observation_date, city) DO UPDATE SET
temp_max_c = EXCLUDED.temp_max_c,
temp_min_c = EXCLUDED.temp_min_c,
precipitation_mm = EXCLUDED.precipitation_mm;""",
parameters=(
record["observation_date"],
record["temp_max_c"],
record["temp_min_c"],
record["precipitation_mm"],
record["city"],
),
)
# Impliziter XCom: Return-Values fließen durch Funktions-Argumente
raw_data = extract()
transformed = transform(raw_data)
load(transformed)
weather_etl_taskflow()
Die TaskFlow-API ist sauberer, aber der klassische Operator-Ansatz funktioniert weiter und ist besser, wenn du Operators mit Custom-Verhalten brauchst (Sensors, Bash-Commands, Provider-spezifische Operators).
Connections und Variables — Secrets-Management
Niemals Credentials hard-coded in DAG-Files. Airflow bietet zwei Mechanismen:
Connections speichern Endpoint + Auth (DB-URIs, API-Keys, Cloud-Credentials). Setzbar via UI, CLI oder Environment-Variablen:
# CLI-Ansatz
airflow connections add 'postgres_warehouse' \
--conn-type postgres \
--conn-host warehouse.example.com \
--conn-port 5432 \
--conn-schema analytics \
--conn-login etl_user \
--conn-password 'securepassword'
# ENV-Ansatz (überschreibt DB-stored Connections)
export AIRFLOW_CONN_POSTGRES_WAREHOUSE='postgresql://etl_user:securepassword@warehouse.example.com:5432/analytics'
Variables speichern Config-Werte (Thresholds, Feature-Flags, Pfade). Zugriff über Variable.get() — aber Achtung: jeder Call queried die Metadata-DB.
from airflow.models import Variable
# Schlecht: queried die DB bei jedem Scheduler-Parse (alle 30s by default)
threshold = Variable.get("quality_threshold")
# Gut: Zugriff innerhalb eines Tasks, läuft einmal pro Execution
@task()
def check_quality():
threshold = float(Variable.get("quality_threshold", default_var="0.95"))
# ... Validation-Logik
Häufige Fehler und Fallen
Das sind die Issues, die ich in Production-Airflow immer wieder sehe:
1. Top-Level-Code in DAG-Files
Airflows Scheduler parsed alle DAG-Files alle 30 Sekunden (konfigurierbar via dag_dir_list_interval). Jeder Code auf Module-Ebene läuft bei jedem Parse — inklusive API-Calls, DB-Queries, schwerer Imports.
# SCHLECHT: läuft alle 30 Sekunden während des Parsings
import pandas as pd
data = pd.read_csv("s3://bucket/large_file.csv") # Killt deinen Scheduler
# GUT: schwere Arbeit gehört in Tasks
@task()
def process_data():
import pandas as pd
data = pd.read_csv("s3://bucket/large_file.csv")
2. XCom-Missbrauch
XCom serialisiert Daten in die Metadata-DB (außer du konfigurierst ein Custom-Backend). DataFrames oder große JSON-Blobs in XCom zu pushen wird deine DB aufblähen und den Scheduler crashen.
Faustregel: XCom für Metadata (Pfade, Row-Counts, Status-Flags). Object-Storage für Daten.
3. Idempotenz ignorieren
Wenn dein DAG nicht idempotent ist, produzieren Retries und Backfills Duplikate oder korrupte Daten. Jeder Load-Task sollte MERGE, INSERT ... ON CONFLICT oder Truncate-and-Reload-Patterns nutzen.
4. depends_on_past=True ohne Verständnis
Dieses Flag macht, dass jeder DAG-Run auf den gleichen Task im vorherigen Run wartet. Wenn ein Run failed, queued alle zukünftigen Runs ewig. Nur nutzen, wenn sequenzielles Processing wirklich nötig ist.
5. max_active_runs nicht setzen
Ohne Limits können Catchup oder externe Trigger hunderte gleichzeitiger DAG-Runs spawnen und deine Worker + Metadata-DB überfordern.
with DAG(
dag_id="safe_dag",
max_active_runs=3, # Max 3 gleichzeitige Runs
max_active_tasks=10, # Max 10 Tasks über alle Runs
# ...
) as dag:
pass
6. Monolithische DAGs
Ein einzelner DAG mit 200 Tasks ist schwer zu debuggen, langsam in der UI und produziert Scheduling-Bottlenecks. Splitten nach Domain oder Data-Source, und data-aware Scheduling für DAG-übergreifende Koordination nutzen.
Monitoring deiner Airflow-Pipelines
Die Airflow-UI zeigt Task-Status, Logs und Gantt-Charts — aber Production-Monitoring braucht mehr. Wichtige Metriken:
| Metrik | Was sie aussagt | Alert-Threshold |
|---|---|---|
| DAG-Parse-Zeit | Scheduler-Gesundheit | > 30 Sekunden |
| Task-Dauer vs. SLA | Pipeline-Performance | > 2× historischer Schnitt |
| Task-Failure-Rate | Datenqualitäts-Risiko | > 5% über 24h |
| Scheduler-Heartbeat | Orchestrator-Liveness | Fehlt > 60s |
| XCom-Size | Metadata-DB-Gesundheit | > 1MB pro Entry |
| DAG-Run-Queue-Tiefe | Capacity-Planning | > 50 queued Runs |
Für umfassende Pipeline-Observability über die Airflow-UI hinaus: Data Pipeline Monitoring.
Wann Airflow NICHT die richtige Wahl ist
Ehrlich zu den Trade-offs sein:
- Einfache Cron-Jobs: 3 unabhängige Scripts ohne Dependencies — Airflow ist Overkill.
- Echtzeit / Streaming: Airflow ist Batch-orientiert. Für Sub-Sekunden-Latenz: Kafka, Flink oder Spark Structured Streaming.
- Leichte Orchestrierung: Weniger Ops-Overhead? Managed-Alternativen wie Dagster Cloud, Prefect Cloud oder Databricks Workflows.
- Teams ohne Python-Skills: Airflow ist Python-nativ. Wenn dein Team in SQL und Drag-and-Drop-Tools lebt, lohnt die Lernkurve evtl. nicht.
Pipeline-Daten jenseits von Airflow erkunden
Sobald deine Airflow-DAGs Daten in Warehouses laden, ist die nächste Frage immer: Wie validiere und erkunde ich schnell, was da angekommen ist? Harbinger Explorer lässt dich Datenquellen direkt im Browser via DuckDB WASM und natürlicher Sprache abfragen — nützlich für Spot-Checks von Pipeline-Outputs oder API-Exploration, bevor du einen vollen DAG drum baust. Der 7-Tage-Test reicht für die meisten Validierungs-Workflows.
FAQ
Welche Python-Version unterstützt Airflow 2.9? 3.9 bis 3.12. 3.8 ist deprecated in 2.9+.
SQLite vs PostgreSQL für die Metadata-DB? SQLite nur für lokale Dev. PostgreSQL (oder MySQL) für jede produktive Nutzung — SQLite serialisiert alle Writes, Scheduler-Performance bricht ein.
Was ist der Unterschied zwischen TaskFlow und klassischen Operators?
TaskFlow ist Decorator-basiert (@task), Return-Values fließen automatisch durch XCom. Klassische Operators sind ausdrücklicher, aber Boilerplate-heavy. Für reine Python-Tasks: TaskFlow. Für vordefinierte Operators (Sensors, S3, Postgres): klassisch.
Wie viele DAGs verträgt ein Standard-Scheduler? Faustregel: 200-500 DAGs auf einem Scheduler mit anständig CPU. Darüber: HA-Scheduler + Parse-Optimierung (kein Top-Level-Code, DAG-Bundling).
Nächste Schritte
Du hast ein lauffähiges Airflow-Setup und ein production-ready DAG-Pattern. Von hier:
- Data-Quality-Testing aufsetzen — Outputs nach jedem Pipeline-Run validieren
- TaskFlow-API-Doku für dynamisches Task-Mapping und Task-Groups
- Containerized Data Pipelines für isolierte, reproduzierbare Task-Execution
Die größte Lektion aus Production-Airflow: in Idempotenz und Monitoring investieren bevor du 50 DAGs hast. Beides nachzurüsten ist schmerzhaft. Mit den Patterns in diesem Tutorial vermeidest du die schlimmsten Überraschungen.
Weiterlesen
- Data Pipeline Monitoring: Observability, die wirklich funktioniert
- Airflow vs Dagster vs Prefect: Welcher Orchestrator passt zu deinem Team?
- Data-Quality-Testing: Bad Data fangen, bevor sie dich fängt
Stand: 14. Mai 2026. Airflow-Versionen ändern sich — Constraints-URLs auf der offiziellen Airflow-Doku checken.
Geschrieben von
Harbinger Team
Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastrukturkritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.
Hat dir das geholfen?
Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.
Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.