Inhaltsverzeichnis10 Abschnitte
Incremental-Processing-Patterns: Watermark, Merge, Append
Full-Table-Rewrites funktionierten bei 10 GB. Bei 10 TB sind sie ein Budget-Posten. Inkrementelles Processing ist, wie du Pipelines skalierst, ohne deine Compute-Kosten proportional mitzuskalieren.
Die Idee ist einfach: nur Zeilen verarbeiten, die sich seit dem letzten Lauf geändert haben. Die Ausführung ist, wo die Komplexität lebt — spät-eintreffende Daten, Deduplizierung, CDC-Events und Idempotenz wollen alle gehandhabt werden. Drei Patterns decken die meisten Use-Cases ab: Watermark (High-Watermark), Merge (Upsert) und Append-only.
TL;DR
- Watermark: lädt Zeilen mit
updated_at > last_run— einfach, aber verliert Late-Data - Merge (Upsert): handhabt Inserts und Updates per Key — robust mit Lookback-Window
- Append-only: nur Inserts, History bleibt erhalten — natürlich für Events und Audit
- Häufiger Fehler: Full-Table-Overwrites lange behalten, nachdem die Daten zu groß sind
Pattern-Übersicht
| Pattern | Wie es funktioniert | Updates? | Late Data? | Komplexität |
|---|---|---|---|---|
| Watermark | Verarbeitet Zeilen mit updated_at > last_run_ts | Ja (wenn Source Updates trackt) | Nein (späte Zeilen verloren) | Niedrig |
| Merge (Upsert) | INSERT neue, UPDATE bestehende per Key | Ja | Ja (innerhalb Lookback-Window) | Mittel |
| Append-Only | Nur INSERT; nie Update oder Delete | Nein | Ja (späte Zeilen landen einfach später) | Niedrig |
Pattern 1: Watermark (High-Watermark)
Das Watermark-Pattern trackt den höchsten updated_at-Timestamp aus dem
letzten erfolgreichen Lauf und lädt nur Zeilen, die neuer als diese Marke
sind.
# Python + SQLAlchemy: Watermark-basierter inkrementeller Load
from datetime import datetime
import sqlalchemy as sa
def get_last_watermark(engine, pipeline_name: str) -> datetime:
# Letzten erfolgreichen Watermark aus Control-Tabelle holen
with engine.connect() as conn:
result = conn.execute(
sa.text(
"SELECT last_watermark FROM pipeline_state WHERE pipeline_name = :name"
),
{"name": pipeline_name}
).fetchone()
return result[0] if result else datetime(2000, 1, 1) # Default für ersten Lauf
def set_watermark(engine, pipeline_name: str, new_watermark: datetime):
# Watermark nach erfolgreichem Load updaten
with engine.connect() as conn:
conn.execute(
sa.text(
"INSERT INTO pipeline_state (pipeline_name, last_watermark) "
"VALUES (:name, :wm) "
"ON CONFLICT (pipeline_name) DO UPDATE SET last_watermark = EXCLUDED.last_watermark"
),
{"name": pipeline_name, "wm": new_watermark}
)
conn.commit()
# Main inkrementeller Load
engine = sa.create_engine("postgresql://...")
last_wm = get_last_watermark(engine, "orders_pipeline")
with engine.connect() as conn:
new_rows = conn.execute(
sa.text(
"SELECT * FROM source.orders WHERE updated_at > :last_wm ORDER BY updated_at"
),
{"last_wm": last_wm}
).fetchall()
max_ts = max(row.updated_at for row in new_rows) if new_rows else last_wm
# ... new_rows ins Target laden ...
set_watermark(engine, "orders_pipeline", max_ts)
Die kritische Annahme: die Source-Tabelle hat eine verlässliche
updated_at-Spalte, die bei jeder Row-Änderung aktualisiert wird. Wenn
die Source das nicht pflegt, bricht das Watermark-Pattern still — Zeilen,
die ohne updated_at-Update geändert werden, werden nie aufgegriffen.
Late-Data-Problem: Wenn eine Zeile in der Source mit einem updated_at
älter als dein aktueller Watermark eintrifft (z. B. ein verzögertes
CDC-Event), wird sie verpasst. Für die meisten Batch-Pipelines akzeptabel;
für Finanz- oder Audit-kritische Pipelines nicht.
dbt-Incremental-Model-Äquivalent:
-- dbt (Spark SQL): Incremental-Model mit Watermark-Strategie
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'order_date', 'data_type': 'date'}
) }}
SELECT
order_id,
customer_id,
amount,
order_date,
updated_at
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Pattern 2: Merge (Upsert)
Das Merge-Pattern handhabt sowohl Inserts als auch Updates in einer einzigen Operation. Es vergleicht eingehende Daten mit der Target-Tabelle auf einem Business-Key und insertet entweder neue oder updated bestehende Zeilen.
-- Spark SQL: MERGE-Statement für inkrementellen Upsert
MERGE INTO target.dim_customer AS target
USING (
SELECT
customer_id,
customer_name,
email,
city,
updated_at
FROM source.customers
WHERE updated_at > '2024-01-01T00:00:00Z' -- Lookback-Window
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
UPDATE SET
target.customer_name = source.customer_name,
target.email = source.email,
target.city = source.city,
target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (customer_id, customer_name, email, city, updated_at)
VALUES (source.customer_id, source.customer_name, source.email, source.city, source.updated_at)
Der AND source.updated_at > target.updated_at-Guard im WHEN MATCHED-Clause
verhindert, dass ältere Daten neuere überschreiben — wichtig beim
Replay historischer Batches oder bei out-of-order Updates.
Merge mit Lookback-Window
Statt Watermarks zu tracken, nutzen viele Teams ein festes Lookback-Window — immer die letzten N Tage neu verarbeiten. Das handhabt spät-eintreffende Daten zum Preis, manche Zeilen unnötig neu zu verarbeiten.
# Python: Merge mit 3-Tage-Lookback-Window
from datetime import datetime, timedelta, timezone
lookback_days = 3
lookback_start = datetime.now(timezone.utc) - timedelta(days=lookback_days)
query = (
"MERGE INTO target.fct_events AS t "
"USING ( "
" SELECT event_id, user_id, event_type, event_ts "
" FROM source.events "
f" WHERE event_ts >= '{lookback_start.isoformat()}' "
") AS s "
"ON t.event_id = s.event_id "
"WHEN MATCHED THEN UPDATE SET t.event_type = s.event_type, t.event_ts = s.event_ts "
"WHEN NOT MATCHED THEN INSERT (event_id, user_id, event_type, event_ts) "
"VALUES (s.event_id, s.user_id, s.event_type, s.event_ts)"
)
Trade-offs von Merge:
- Handhabt Inserts und Updates sauber
- Lookback-Window handhabt die meisten Late-Data-Szenarien
- Braucht verlässlichen Unique-Key im Target
- Teurer als Append: DB muss matching Keys scannen
- SCD Type 2 Tracking braucht zusätzliche Logik
Pattern 3: Append-Only
Das Append-only-Pattern updated oder löscht nie Zeilen. Jede Änderung aus der Source kommt als neue Zeile ins Target. Operativ das einfachste Pattern — und das mächtigste für bestimmte Use-Cases.
# Python + PySpark: Append-only inkrementeller Load
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
spark = SparkSession.builder.getOrCreate()
# Nur neue Events aus der Source lesen (Kafka, Log-Files, etc.)
new_events = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "user_events")
.load()
)
# An Delta-Lake-Tabelle anhängen
(
new_events
.withColumn("ingested_at", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoints/events")
.outputMode("append")
.start("s3://my-bucket/events/")
)
Append-only ist das natürliche Pattern für:
- Event-Streams (Klicks, Page-Views, Sensor-Readings)
- Audit-Logs
- Unveränderbare Transaktions-Records
- Alle Daten, wo die History von Änderungen wichtig ist, nicht nur der aktuelle Zustand
Aktuellen Zustand aus Append-only-Daten ableiten:
Da die Tabelle alle Versionen eines Records enthält, musst du den aktuellen Zustand zur Query-Zeit ableiten:
-- Spark SQL: aktuellen Zustand aus Append-only Event-Log holen
-- ROW_NUMBER für die neueste Version jedes Customer-Records
WITH ranked AS (
SELECT
customer_id,
customer_name,
email,
city,
event_ts,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY event_ts DESC
) AS rn
FROM events.customer_updates
)
SELECT customer_id, customer_name, email, city
FROM ranked
WHERE rn = 1
Teurer als eine Merge-gepflegte Tabelle zu lesen, aber bewahrt die volle History.
Deduplizierung: die versteckte Herausforderung
Alle drei Patterns sehen sich Deduplizierungs-Herausforderungen gegenüber. Netzwerk-Retries, Pipeline-Restarts und CDC-Redelivery können denselben Source-Event mehrfach eintreffen lassen.
# Python + PySpark: idempotenter Append mit Deduplizierung
from pyspark.sql.functions import col
# Eingehenden Batch lesen
incoming = spark.read.parquet("s3://landing/events/2024/03/15/")
# Existierende Keys aus Target lesen (nur Key-Spalte — kein Full-Scan)
existing_keys = (
spark.read
.format("delta")
.load("s3://target/events/")
.select("event_id")
)
# Nur wirklich neue Zeilen behalten
new_only = incoming.join(existing_keys, on="event_id", how="left_anti")
# Anhängen
(
new_only.write
.format("delta")
.mode("append")
.save("s3://target/events/")
)
Für Merge-basierte Patterns wird Deduplizierung durch den MERGE-Key behandelt — Duplikat-Source-Zeilen triggern einfach einen No-op-Update, wenn die Daten nicht geändert haben.
Das richtige Pattern wählen
| Situation | Empfohlenes Pattern |
|---|---|
Source hat verlässliches updated_at, keine Late Data | Watermark |
| Source hat Updates und Deletes, brauche aktuellen Zustand | Merge |
| Event-Daten, unveränderbare Facts, Audit-Anforderungen | Append-only |
| Brauche History-Replay oder Recovery von Failures | Append-only (einfacheres Replay) |
| Source-Daten haben Late-Arrivals in vorhersagbarem Fenster | Merge mit Lookback |
| Slowly Changing Dimensions (SCD Type 2) | Merge mit Versioning-Logik |
FAQ
Wann Append-only vs. Merge wählen? Append-only bei Events und Audit-Bedarf — du willst die History. Merge bei Stamm- und Dimensionsdaten, wo du nur den aktuellen Zustand brauchst.
Was, wenn meine Source kein verlässliches updated_at hat?
CDC (Change Data Capture) auf der Source einrichten — Debezium, Fivetran
oder eingebaute Provider. Oder Append-only mit Tombstones für Deletes.
Wie groß sollte das Lookback-Window sein? Konservativ: 3× die maximale beobachtete Latenz deiner Source. Wenn Late Data typisch 6h nachhinkt, plane mit 24h Lookback.
Lohnt sich Merge für sehr große Tabellen? Ab ~100M Zeilen wird MERGE teuer. Dann: Partition-spezifisches MERGE oder Append-only + Materialized Views für aktuellen Zustand.
Fazit
Watermark ist das einfachste Pattern und funktioniert gut, wenn deine Source sauber ist. Merge handhabt reale Unordnung — Late Data, Updates, Deletes — zum Preis komplexerer Logik. Append-only ist der richtige Default für Event-Daten und alles, wo History zählt.
Der häufige Fehler ist, lange auf Full-Table-Overwrites zu setzen, nachdem du sie längst überwachsen hast. Der zweithäufigste ist, Merge zu implementieren, ohne darüber nachzudenken, was passiert, wenn dieselbe Zeile zweimal kommt. Beides lohnt sich, früh richtig zu machen.
Stand: 14. Mai 2026.
Geschrieben von
Harbinger Team
Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastrukturkritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.
Hat dir das geholfen?
Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.
Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.