Cloud allgemein

Data Pipeline Monitoring: Failures fangen, bevor Stakeholder:innen es tun

Slack um 7:42: 'Dashboard zeigt gestrige Zahlen.' Wie du Execution, Quality, Performance und Metadata so monitorest, dass C-Level keine veralteten Daten mehr sieht.

Harbinger Team30. März 20266 Min. LesezeitAktualisiert 14.5.2026
  • pipeline-monitoring
  • data-quality
  • observability
  • airflow
  • data-engineering
  • sla-monitoring
Inhaltsverzeichnis14 Abschnitte

Dein Slack leuchtet um 7:42: "Das Umsatz-Dashboard zeigt gestrige Zahlen." Du prüfst Airflow — ein Task ist um 3 Uhr still gefailed, kein Alert, und jetzt schaut das ganze C-Level auf veraltete Daten.

TL;DR

  • Pipeline-Failures sind oft silent — Pipeline meldet "complete", liefert 0 Rows.
  • Vier Säulen: Execution, Quality, Performance, Metadata/Lineage.
  • SLA-Monitoring schlägt Failure-Monitoring (Erfolg um 11 statt 6 Uhr ist auch Failure).
  • Alert-Fatigue ist Killer #1 — wenige, hochsignal Alerts.
  • Monitoring beim Bau dazu, nicht als Afterthought.

Warum Pipeline-Monitoring mehr zählt, als du denkst

Pipelines sind unsichtbare Infrastruktur. Funktioniert: niemand merkt. Bricht: alle merken — Stunden zu spät.

Das Kernproblem: Pipelines failen silent. Anders als eine Web-App, die 500 wirft, produziert eine kaputte Pipeline einfach... keine frischen Daten. Dashboard lädt weiter, ML-Model liefert weiter — nur falsch.

Failure-TypBeispielErkennungszeit ohne Monitoring
Hard FailureTask wirft Exception, DAG stopptStunden (nächste manuelle Prüfung)
Soft FailurePipeline complete, 0 RowsTage (Stakeholder-Beschwerde)
Data-DriftUpstream-Schema-Change bricht JOINTage bis Wochen
Latency-CreepPipeline wird graduell 4× langsamerWochen (SLA-Breach)
Silent CorruptionFalsche Werte, DuplikateWochen bis Monate

Hard Failures sind die einfachen. Soft Failures — die "erfolgreichen" Pipelines, die Müll produzieren — verursachen den echten Schaden.

Die vier Säulen

1. Execution-Monitoring (Lief es?)

Baseline: lief, wann gestartet, wann fertig, ob erfolgreich. Jeder Orchestrator gibt das gratis — Airflow, Dagster, Prefect. Fehler: nur auf Orchestrator-UI verlassen, nicht in Alerting-System routen.

import requests
from airflow.models import Variable

def slack_failure_alert(context):
    webhook_url = Variable.get("slack_webhook_url")
    task_instance = context.get("task_instance")
    payload = {
        "text": (
            f":rotating_light: *Pipeline-Failure*\n"
            f"*DAG:* `{task_instance.dag_id}`\n"
            f"*Task:* `{task_instance.task_id}`\n"
            f"*Execution Date:* {context.get('execution_date').isoformat()}\n"
            f"*Logs:* <{task_instance.log_url}|View Logs>"
        )
    }
    requests.post(webhook_url, json=payload, timeout=10)


def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    webhook_url = Variable.get("slack_webhook_url")
    task_names = ", ".join([t.task_id for t in task_list])
    payload = {
        "text": (
            f":warning: *SLA-Miss*\n"
            f"*DAG:* `{dag.dag_id}`\n"
            f"*Tasks:* `{task_names}`\n"
            f"*Erwartet bis:* {slas[0].execution_date.isoformat()}"
        )
    }
    requests.post(webhook_url, json=payload, timeout=10)

Lektion: SLA-Monitoring ist nützlicher als Failure-Monitoring. Ein Task, der 5× retried und um 11 statt 6 Uhr endlich grün ist, ist technisch "Success" — Stakeholder:innen sehen das anders.

2. Quality-Monitoring (Ist der Output korrekt?)

Hier haben die meisten Teams die größte Lücke. Pipeline lief erfolgreich — produzierte sie auch gute Daten?

Minimum nach jedem Run:

  • Row-Counts: Sinnvolle Zahl? (Nicht 0, nicht 10× normal)
  • Null-Raten: Kritische Spalten unerwartet null?
  • Uniqueness: Primary Keys tatsächlich unique?
  • Freshness: Neuester Timestamp in erwartetem Range?
  • Value-Ranges: Numerische Werte plausibel?
WITH row_check AS (
    SELECT COUNT(*) AS row_count,
        CASE
            WHEN COUNT(*) = 0 THEN 'CRITICAL: Zero rows'
            WHEN COUNT(*) < 1000 THEN 'WARNING: Low'
            WHEN COUNT(*) > 1000000 THEN 'WARNING: High'
            ELSE 'OK'
        END AS status
    FROM analytics.daily_revenue WHERE load_date = CURRENT_DATE
),
null_check AS (
    SELECT
        ROUND(100.0 * COUNT(*) FILTER (WHERE revenue IS NULL) / NULLIF(COUNT(*), 0), 2) AS revenue_null_pct,
        ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 2) AS customer_id_null_pct
    FROM analytics.daily_revenue WHERE load_date = CURRENT_DATE
),
freshness_check AS (
    SELECT MAX(event_timestamp) AS latest_event,
        CASE
            WHEN MAX(event_timestamp) < NOW() - INTERVAL '24 hours' THEN 'CRITICAL: > 24h'
            WHEN MAX(event_timestamp) < NOW() - INTERVAL '6 hours' THEN 'WARNING: > 6h'
            ELSE 'OK'
        END AS freshness_status
    FROM analytics.daily_revenue WHERE load_date = CURRENT_DATE
),
pk_check AS (
    SELECT
        CASE WHEN COUNT(*) != COUNT(DISTINCT transaction_id)
        THEN 'CRITICAL: Duplicate PKs' ELSE 'OK' END AS uniqueness_status
    FROM analytics.daily_revenue WHERE load_date = CURRENT_DATE
)
SELECT r.row_count, r.status, n.revenue_null_pct, n.customer_id_null_pct,
       f.freshness_status, p.uniqueness_status
FROM row_check r, null_check n, freshness_check f, pk_check p;

Statische Thresholds für v1 — Anomaly-Detection (Standard-Deviation vom Rolling-Average) später.

3. Performance-Monitoring (Schnell genug?)

MetrikWarumAlert-Threshold (Beispiel)
Total DAG-DurationSLA-Compliance> 2× Median letzte 30 Runs
Individual Task-DurationBottleneck isolieren> 3× Median für Task
Verarbeitetes VolumenCapacity-Planning> 2× Average (Quelle prüfen)
Memory / CPUInfra-Kosten> 80 % sustained
Queue-WaitOrchestrator-Kapazität> 15 Min (Worker skalieren)

4. Metadata- und Lineage-Monitoring (Was änderte sich?)

Wenn was bricht, brauchst du: was änderte sich? Schema-Änderungen, Volume-Spikes, neue Quellsysteme.

from sqlalchemy import create_engine, inspect
import json
from pathlib import Path

def get_current_schema(engine, table_name: str, schema: str = "public") -> dict:
    inspector = inspect(engine)
    columns = inspector.get_columns(table_name, schema=schema)
    return { col["name"]: str(col["type"]) for col in columns }

def detect_schema_changes(engine, table_name: str,
                          snapshot_dir: str = "/tmp/schema_snapshots",
                          schema: str = "public") -> dict:
    snapshot_path = Path(snapshot_dir) / f"{schema}_{table_name}.json"
    current = get_current_schema(engine, table_name, schema)
    changes = {"added": [], "removed": [], "type_changed": {}}

    if snapshot_path.exists():
        previous = json.loads(snapshot_path.read_text())
        current_cols = set(current.keys())
        previous_cols = set(previous.keys())
        changes["added"] = sorted(current_cols - previous_cols)
        changes["removed"] = sorted(previous_cols - current_cols)
        for col in current_cols & previous_cols:
            if current[col] != previous[col]:
                changes["type_changed"][col] = {"old": previous[col], "new": current[col]}

    snapshot_path.parent.mkdir(parents=True, exist_ok=True)
    snapshot_path.write_text(json.dumps(current, indent=2))
    return changes

Monitoring-Stack-Roadmap

Phase 1: Foundation (Woche 1)

  • Failure-Callbacks im Orchestrator
  • SLA-Monitoring für Top-5-Pipelines
  • Alle Alerts in Slack/PagerDuty (nicht nur UI)
  • Row-Count-Checks als Post-Pipeline-Tasks

Phase 2: Quality (Monat 1)

  • SQL-basierte Quality-Checks (Null, Uniqueness, Freshness)
  • Schema-Change-Detection auf kritischen Sources
  • Dashboard mit Pipeline-Durations über Zeit
  • On-Call-Rotation

Phase 3: Advanced (Quartal 1)

  • Great Expectations oder dbt-Tests für systematische Checks
  • Anomaly-Detection (Rolling Avg ± 2σ)
  • Lineage-Tracking
  • Runbooks pro Failure-Szenario

Häufige Fehler

  • Alert-Fatigue ist Killer #1. 50 Alerts/Tag = alle ignoriert. Wenige, hochsignal-Alerts.
  • Nur Happy-Path monitoren. Rows produzieren ist nötig, nicht hinreichend. Duplikate, Nulls, technisch valide aber logisch falsche Werte (negativer Umsatz, Zukunfts-Timestamps).
  • "Erfolgreich aber bad Data" skippen. Gefährlichster Failure-Mode.
  • Monitoring als Afterthought. Beste Zeit zum Bau = beim Pipeline-Bau. Zweitbeste: jetzt.
  • Alert auf Nicht-Actionable. Jeder Alert braucht Owner und dokumentierten Response.

Checkliste

CheckImplementiert?Alert-ChannelRunbook
Task-Failure
SLA-Miss
Row-Count-Validierung
Null-Rate-Check
Primary-Key-Uniqueness
Freshness-Check
Schema-Change
Duration-Tracking
Volume-Anomaly

FAQ

Was monitor ich zuerst? Execution + Row-Counts an kritischen Targets. Das fängt 70 % der häufigen Failures.

Brauche ich Great Expectations oder reicht dbt? dbt-Tests reichen für die meisten Teams. GE ist sinnvoll bei komplexen Quality-Regeln über mehrere Systeme.

Was kostet ein Failure typischerweise? Stundenlohn × Investigation-Stunden + opportunity Cost für falsche Entscheidung. Mittelständische DACH-Teams berichten 5–20k € pro Major-Incident.

Wie viele Alerts pro Tag sind ok? Faustregel: < 5 echte Alerts pro Tag. Mehr = Threshold zu eng oder Pipelines zu zerbrechlich.

Stand: 14. Mai 2026.

H

Geschrieben von

Harbinger Team

Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastruktur­kritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.

Hat dir das geholfen?

Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.

Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.