Harbinger Explorer

Back to Knowledge Hub
Engineering
Published:

Incremental Processing Patterns: Watermark, Merge, Append

9 min read·Tags: incremental processing, data pipelines, watermark, upsert, merge, spark, delta lake

Incremental Processing Patterns: Watermark, Merge, Append

Full table rewrites worked fine at 10 GB. At 10 TB, they're a budget line item. Incremental processing is how you scale pipelines without scaling your compute costs proportionally.

The idea is simple: only process rows that have changed since the last run. The execution is where the complexity lives — late-arriving data, deduplication, CDC events, and idempotency all need handling. Three patterns cover most use cases: watermark (high-watermark), merge (upsert), and append-only.

Pattern Overview

PatternHow it worksHandles updates?Handles late data?Complexity
WatermarkProcess rows where updated_at > last_run_ts✅ (if source tracks updates)❌ (late rows are missed)Low
Merge (Upsert)INSERT new rows, UPDATE existing rows by key✅ (if within lookback window)Medium
Append-OnlyOnly INSERT; never update or delete✅ (late rows just land later)Low

Pattern 1: Watermark (High-Watermark)

The watermark pattern tracks the highest updated_at timestamp seen in the last successful run and loads only rows newer than that mark.

# Python + SQLAlchemy: watermark-based incremental load
from datetime import datetime
import sqlalchemy as sa

def get_last_watermark(engine, pipeline_name: str) -> datetime:
    # Retrieve the last successful watermark from a control table
    with engine.connect() as conn:
        result = conn.execute(
            sa.text(
                "SELECT last_watermark FROM pipeline_state WHERE pipeline_name = :name"
            ),
            {"name": pipeline_name}
        ).fetchone()
    return result[0] if result else datetime(2000, 1, 1)  # default for first run

def set_watermark(engine, pipeline_name: str, new_watermark: datetime):
    # Update watermark after successful load
    with engine.connect() as conn:
        conn.execute(
            sa.text(
                "INSERT INTO pipeline_state (pipeline_name, last_watermark) "
                "VALUES (:name, :wm) "
                "ON CONFLICT (pipeline_name) DO UPDATE SET last_watermark = EXCLUDED.last_watermark"
            ),
            {"name": pipeline_name, "wm": new_watermark}
        )
        conn.commit()

# Main incremental load
engine = sa.create_engine("postgresql://...")
last_wm = get_last_watermark(engine, "orders_pipeline")

with engine.connect() as conn:
    new_rows = conn.execute(
        sa.text(
            "SELECT * FROM source.orders WHERE updated_at > :last_wm ORDER BY updated_at"
        ),
        {"last_wm": last_wm}
    ).fetchall()

max_ts = max(row.updated_at for row in new_rows) if new_rows else last_wm
# ... load new_rows into target ...
set_watermark(engine, "orders_pipeline", max_ts)

The critical assumption: the source table has a reliable updated_at column that is updated on every row change. If the source doesn't maintain this, the watermark pattern breaks silently — rows that are updated without touching updated_at will never be picked up.

Late data problem: If a row arrives in the source with an updated_at older than your current watermark (e.g., a delayed CDC event), it will be missed. For most batch pipelines this is acceptable; for financial or audit-critical pipelines, it's not.

dbt incremental model equivalent:

-- dbt (Spark SQL): incremental model with watermark strategy
{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={'field': 'order_date', 'data_type': 'date'}
) }}

SELECT
    order_id,
    customer_id,
    amount,
    order_date,
    updated_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Pattern 2: Merge (Upsert)

The merge pattern handles both inserts and updates in a single operation. It compares incoming data against the target table on a business key and either inserts new rows or updates existing ones.

-- Spark SQL: MERGE statement for incremental upsert
MERGE INTO target.dim_customer AS target
USING (
    SELECT
        customer_id,
        customer_name,
        email,
        city,
        updated_at
    FROM source.customers
    WHERE updated_at > '2024-01-01T00:00:00Z'  -- lookback window
) AS source
ON target.customer_id = source.customer_id

WHEN MATCHED AND source.updated_at > target.updated_at THEN
    UPDATE SET
        target.customer_name = source.customer_name,
        target.email         = source.email,
        target.city          = source.city,
        target.updated_at    = source.updated_at

WHEN NOT MATCHED THEN
    INSERT (customer_id, customer_name, email, city, updated_at)
    VALUES (source.customer_id, source.customer_name, source.email, source.city, source.updated_at)

The AND source.updated_at > target.updated_at guard in the WHEN MATCHED clause prevents older data from overwriting newer records — important when replaying historical batches or handling out-of-order updates.

Merge with a lookback window

Instead of tracking watermarks, many teams use a fixed lookback window — always reprocess the last N days. This handles late-arriving data at the cost of reprocessing some rows unnecessarily.

# Python: merge with 3-day lookback window
from datetime import datetime, timedelta, timezone

lookback_days = 3
lookback_start = datetime.now(timezone.utc) - timedelta(days=lookback_days)

query = (
    "MERGE INTO target.fct_events AS t "
    "USING ( "
    "    SELECT event_id, user_id, event_type, event_ts "
    "    FROM source.events "
    f"    WHERE event_ts >= '{lookback_start.isoformat()}' "
    ") AS s "
    "ON t.event_id = s.event_id "
    "WHEN MATCHED THEN UPDATE SET t.event_type = s.event_type, t.event_ts = s.event_ts "
    "WHEN NOT MATCHED THEN INSERT (event_id, user_id, event_type, event_ts) "
    "VALUES (s.event_id, s.user_id, s.event_type, s.event_ts)"
)

Trade-offs of merge:

  • ✅ Handles both inserts and updates cleanly
  • ✅ Lookback window handles most late-data scenarios
  • ❌ Requires a reliable unique key on the target
  • ❌ More expensive than append: the database must scan for matching keys
  • ❌ SCD Type 2 tracking requires additional logic (merge alone only keeps the latest version)

Pattern 3: Append-Only

The append-only pattern never updates or deletes rows. Every change from the source arrives as a new row in the target. This is the simplest pattern operationally — and the most powerful for certain use cases.

# Python + PySpark: append-only incremental load
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# Read only new events from the source (Kafka, log files, etc.)
new_events = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "user_events")
    .load()
)

# Append to Delta Lake table
(
    new_events
    .withColumn("ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoints/events")
    .outputMode("append")
    .start("s3://my-bucket/events/")
)

Append-only is the natural pattern for:

  • Event streams (clicks, page views, sensor readings)
  • Audit logs
  • Immutable transaction records
  • Any data where the history of changes matters, not just the current state

Handling "current state" from append-only data:

Since the table contains all versions of a record, you need to derive the current state at query time:

-- Spark SQL: get current state from an append-only event log
-- Using ROW_NUMBER to find the latest version of each customer record
WITH ranked AS (
    SELECT
        customer_id,
        customer_name,
        email,
        city,
        event_ts,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY event_ts DESC
        ) AS rn
    FROM events.customer_updates
)
SELECT customer_id, customer_name, email, city
FROM ranked
WHERE rn = 1

This is more expensive than reading a merge-maintained table, but it preserves the full history.


Deduplication: The Hidden Challenge

All three patterns face deduplication challenges. Network retries, pipeline restarts, and CDC redelivery can all cause the same source event to arrive multiple times.

# Python + PySpark: idempotent append with deduplication
from pyspark.sql.functions import col

# Read incoming batch
incoming = spark.read.parquet("s3://landing/events/2024/03/15/")

# Read existing keys from target (only the key column — no full scan)
existing_keys = (
    spark.read
    .format("delta")
    .load("s3://target/events/")
    .select("event_id")
)

# Keep only genuinely new rows
new_only = incoming.join(existing_keys, on="event_id", how="left_anti")

# Append
(
    new_only.write
    .format("delta")
    .mode("append")
    .save("s3://target/events/")
)

For merge-based patterns, deduplication is handled by the MERGE key — duplicate source rows will just trigger a no-op update if the data hasn't changed.


Choosing the Right Pattern

SituationRecommended pattern
Source has reliable updated_at, no late dataWatermark
Source has updates and deletes, need current stateMerge
Event data, immutable facts, audit requirementsAppend-only
Need to replay history or recover from failuresAppend-only (easier replay)
Source data has late arrivals within predictable windowMerge with lookback
Slowly changing dimensions (SCD Type 2)Merge with versioning logic

Wrapping Up

Watermark is the simplest pattern and works well when your source is clean. Merge handles real-world messiness — late data, updates, deletes — at the cost of more complex logic. Append-only is the right default for event data and anything where history matters.

The common mistake is defaulting to full table overwrites long after you've outgrown them. The second most common mistake is implementing merge without thinking about what happens when the same row arrives twice. Both are worth getting right early.


Continue Reading


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...