Cloud allgemein

Incremental-Processing-Patterns: Watermark, Merge, Append

Praktischer Leitfaden zu den drei Kern-Patterns für inkrementelle Verarbeitung — Watermark, Merge (Upsert) und Append-only — mit SQL- und PySpark-Beispielen.

Harbinger Team14. Mai 20267 Min. LesezeitAktualisiert 14.5.2026
  • incremental processing
  • data pipelines
  • watermark
  • upsert
  • merge
  • spark
  • delta lake
Inhaltsverzeichnis10 Abschnitte

Incremental-Processing-Patterns: Watermark, Merge, Append

Full-Table-Rewrites funktionierten bei 10 GB. Bei 10 TB sind sie ein Budget-Posten. Inkrementelles Processing ist, wie du Pipelines skalierst, ohne deine Compute-Kosten proportional mitzuskalieren.

Die Idee ist einfach: nur Zeilen verarbeiten, die sich seit dem letzten Lauf geändert haben. Die Ausführung ist, wo die Komplexität lebt — spät-eintreffende Daten, Deduplizierung, CDC-Events und Idempotenz wollen alle gehandhabt werden. Drei Patterns decken die meisten Use-Cases ab: Watermark (High-Watermark), Merge (Upsert) und Append-only.

TL;DR

  • Watermark: lädt Zeilen mit updated_at > last_run — einfach, aber verliert Late-Data
  • Merge (Upsert): handhabt Inserts und Updates per Key — robust mit Lookback-Window
  • Append-only: nur Inserts, History bleibt erhalten — natürlich für Events und Audit
  • Häufiger Fehler: Full-Table-Overwrites lange behalten, nachdem die Daten zu groß sind

Pattern-Übersicht

PatternWie es funktioniertUpdates?Late Data?Komplexität
WatermarkVerarbeitet Zeilen mit updated_at > last_run_tsJa (wenn Source Updates trackt)Nein (späte Zeilen verloren)Niedrig
Merge (Upsert)INSERT neue, UPDATE bestehende per KeyJaJa (innerhalb Lookback-Window)Mittel
Append-OnlyNur INSERT; nie Update oder DeleteNeinJa (späte Zeilen landen einfach später)Niedrig

Pattern 1: Watermark (High-Watermark)

Das Watermark-Pattern trackt den höchsten updated_at-Timestamp aus dem letzten erfolgreichen Lauf und lädt nur Zeilen, die neuer als diese Marke sind.

# Python + SQLAlchemy: Watermark-basierter inkrementeller Load
from datetime import datetime
import sqlalchemy as sa

def get_last_watermark(engine, pipeline_name: str) -> datetime:
    # Letzten erfolgreichen Watermark aus Control-Tabelle holen
    with engine.connect() as conn:
        result = conn.execute(
            sa.text(
                "SELECT last_watermark FROM pipeline_state WHERE pipeline_name = :name"
            ),
            {"name": pipeline_name}
        ).fetchone()
    return result[0] if result else datetime(2000, 1, 1)  # Default für ersten Lauf

def set_watermark(engine, pipeline_name: str, new_watermark: datetime):
    # Watermark nach erfolgreichem Load updaten
    with engine.connect() as conn:
        conn.execute(
            sa.text(
                "INSERT INTO pipeline_state (pipeline_name, last_watermark) "
                "VALUES (:name, :wm) "
                "ON CONFLICT (pipeline_name) DO UPDATE SET last_watermark = EXCLUDED.last_watermark"
            ),
            {"name": pipeline_name, "wm": new_watermark}
        )
        conn.commit()

# Main inkrementeller Load
engine = sa.create_engine("postgresql://...")
last_wm = get_last_watermark(engine, "orders_pipeline")

with engine.connect() as conn:
    new_rows = conn.execute(
        sa.text(
            "SELECT * FROM source.orders WHERE updated_at > :last_wm ORDER BY updated_at"
        ),
        {"last_wm": last_wm}
    ).fetchall()

max_ts = max(row.updated_at for row in new_rows) if new_rows else last_wm
# ... new_rows ins Target laden ...
set_watermark(engine, "orders_pipeline", max_ts)

Die kritische Annahme: die Source-Tabelle hat eine verlässliche updated_at-Spalte, die bei jeder Row-Änderung aktualisiert wird. Wenn die Source das nicht pflegt, bricht das Watermark-Pattern still — Zeilen, die ohne updated_at-Update geändert werden, werden nie aufgegriffen.

Late-Data-Problem: Wenn eine Zeile in der Source mit einem updated_at älter als dein aktueller Watermark eintrifft (z. B. ein verzögertes CDC-Event), wird sie verpasst. Für die meisten Batch-Pipelines akzeptabel; für Finanz- oder Audit-kritische Pipelines nicht.

dbt-Incremental-Model-Äquivalent:

-- dbt (Spark SQL): Incremental-Model mit Watermark-Strategie
{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={'field': 'order_date', 'data_type': 'date'}
) }}

SELECT
    order_id,
    customer_id,
    amount,
    order_date,
    updated_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Pattern 2: Merge (Upsert)

Das Merge-Pattern handhabt sowohl Inserts als auch Updates in einer einzigen Operation. Es vergleicht eingehende Daten mit der Target-Tabelle auf einem Business-Key und insertet entweder neue oder updated bestehende Zeilen.

-- Spark SQL: MERGE-Statement für inkrementellen Upsert
MERGE INTO target.dim_customer AS target
USING (
    SELECT
        customer_id,
        customer_name,
        email,
        city,
        updated_at
    FROM source.customers
    WHERE updated_at > '2024-01-01T00:00:00Z'  -- Lookback-Window
) AS source
ON target.customer_id = source.customer_id

WHEN MATCHED AND source.updated_at > target.updated_at THEN
    UPDATE SET
        target.customer_name = source.customer_name,
        target.email         = source.email,
        target.city          = source.city,
        target.updated_at    = source.updated_at

WHEN NOT MATCHED THEN
    INSERT (customer_id, customer_name, email, city, updated_at)
    VALUES (source.customer_id, source.customer_name, source.email, source.city, source.updated_at)

Der AND source.updated_at > target.updated_at-Guard im WHEN MATCHED-Clause verhindert, dass ältere Daten neuere überschreiben — wichtig beim Replay historischer Batches oder bei out-of-order Updates.

Merge mit Lookback-Window

Statt Watermarks zu tracken, nutzen viele Teams ein festes Lookback-Window — immer die letzten N Tage neu verarbeiten. Das handhabt spät-eintreffende Daten zum Preis, manche Zeilen unnötig neu zu verarbeiten.

# Python: Merge mit 3-Tage-Lookback-Window
from datetime import datetime, timedelta, timezone

lookback_days = 3
lookback_start = datetime.now(timezone.utc) - timedelta(days=lookback_days)

query = (
    "MERGE INTO target.fct_events AS t "
    "USING ( "
    "    SELECT event_id, user_id, event_type, event_ts "
    "    FROM source.events "
    f"    WHERE event_ts >= '{lookback_start.isoformat()}' "
    ") AS s "
    "ON t.event_id = s.event_id "
    "WHEN MATCHED THEN UPDATE SET t.event_type = s.event_type, t.event_ts = s.event_ts "
    "WHEN NOT MATCHED THEN INSERT (event_id, user_id, event_type, event_ts) "
    "VALUES (s.event_id, s.user_id, s.event_type, s.event_ts)"
)

Trade-offs von Merge:

  • Handhabt Inserts und Updates sauber
  • Lookback-Window handhabt die meisten Late-Data-Szenarien
  • Braucht verlässlichen Unique-Key im Target
  • Teurer als Append: DB muss matching Keys scannen
  • SCD Type 2 Tracking braucht zusätzliche Logik

Pattern 3: Append-Only

Das Append-only-Pattern updated oder löscht nie Zeilen. Jede Änderung aus der Source kommt als neue Zeile ins Target. Operativ das einfachste Pattern — und das mächtigste für bestimmte Use-Cases.

# Python + PySpark: Append-only inkrementeller Load
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# Nur neue Events aus der Source lesen (Kafka, Log-Files, etc.)
new_events = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "user_events")
    .load()
)

# An Delta-Lake-Tabelle anhängen
(
    new_events
    .withColumn("ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoints/events")
    .outputMode("append")
    .start("s3://my-bucket/events/")
)

Append-only ist das natürliche Pattern für:

  • Event-Streams (Klicks, Page-Views, Sensor-Readings)
  • Audit-Logs
  • Unveränderbare Transaktions-Records
  • Alle Daten, wo die History von Änderungen wichtig ist, nicht nur der aktuelle Zustand

Aktuellen Zustand aus Append-only-Daten ableiten:

Da die Tabelle alle Versionen eines Records enthält, musst du den aktuellen Zustand zur Query-Zeit ableiten:

-- Spark SQL: aktuellen Zustand aus Append-only Event-Log holen
-- ROW_NUMBER für die neueste Version jedes Customer-Records
WITH ranked AS (
    SELECT
        customer_id,
        customer_name,
        email,
        city,
        event_ts,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY event_ts DESC
        ) AS rn
    FROM events.customer_updates
)
SELECT customer_id, customer_name, email, city
FROM ranked
WHERE rn = 1

Teurer als eine Merge-gepflegte Tabelle zu lesen, aber bewahrt die volle History.

Deduplizierung: die versteckte Herausforderung

Alle drei Patterns sehen sich Deduplizierungs-Herausforderungen gegenüber. Netzwerk-Retries, Pipeline-Restarts und CDC-Redelivery können denselben Source-Event mehrfach eintreffen lassen.

# Python + PySpark: idempotenter Append mit Deduplizierung
from pyspark.sql.functions import col

# Eingehenden Batch lesen
incoming = spark.read.parquet("s3://landing/events/2024/03/15/")

# Existierende Keys aus Target lesen (nur Key-Spalte — kein Full-Scan)
existing_keys = (
    spark.read
    .format("delta")
    .load("s3://target/events/")
    .select("event_id")
)

# Nur wirklich neue Zeilen behalten
new_only = incoming.join(existing_keys, on="event_id", how="left_anti")

# Anhängen
(
    new_only.write
    .format("delta")
    .mode("append")
    .save("s3://target/events/")
)

Für Merge-basierte Patterns wird Deduplizierung durch den MERGE-Key behandelt — Duplikat-Source-Zeilen triggern einfach einen No-op-Update, wenn die Daten nicht geändert haben.

Das richtige Pattern wählen

SituationEmpfohlenes Pattern
Source hat verlässliches updated_at, keine Late DataWatermark
Source hat Updates und Deletes, brauche aktuellen ZustandMerge
Event-Daten, unveränderbare Facts, Audit-AnforderungenAppend-only
Brauche History-Replay oder Recovery von FailuresAppend-only (einfacheres Replay)
Source-Daten haben Late-Arrivals in vorhersagbarem FensterMerge mit Lookback
Slowly Changing Dimensions (SCD Type 2)Merge mit Versioning-Logik

FAQ

Wann Append-only vs. Merge wählen? Append-only bei Events und Audit-Bedarf — du willst die History. Merge bei Stamm- und Dimensionsdaten, wo du nur den aktuellen Zustand brauchst.

Was, wenn meine Source kein verlässliches updated_at hat? CDC (Change Data Capture) auf der Source einrichten — Debezium, Fivetran oder eingebaute Provider. Oder Append-only mit Tombstones für Deletes.

Wie groß sollte das Lookback-Window sein? Konservativ: 3× die maximale beobachtete Latenz deiner Source. Wenn Late Data typisch 6h nachhinkt, plane mit 24h Lookback.

Lohnt sich Merge für sehr große Tabellen? Ab ~100M Zeilen wird MERGE teuer. Dann: Partition-spezifisches MERGE oder Append-only + Materialized Views für aktuellen Zustand.

Fazit

Watermark ist das einfachste Pattern und funktioniert gut, wenn deine Source sauber ist. Merge handhabt reale Unordnung — Late Data, Updates, Deletes — zum Preis komplexerer Logik. Append-only ist der richtige Default für Event-Daten und alles, wo History zählt.

Der häufige Fehler ist, lange auf Full-Table-Overwrites zu setzen, nachdem du sie längst überwachsen hast. Der zweithäufigste ist, Merge zu implementieren, ohne darüber nachzudenken, was passiert, wenn dieselbe Zeile zweimal kommt. Beides lohnt sich, früh richtig zu machen.

Stand: 14. Mai 2026.

H

Geschrieben von

Harbinger Team

Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastruktur­kritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.

Hat dir das geholfen?

Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.

Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.