Idempotent Data Pipelines: Patterns for Safe Retries
Your Airflow job failed at 3 AM, halfway through loading 6 hours of transaction data. You click "Clear and Re-run." Now you have duplicates in production. A data analyst notices revenue is inflated by 40% for the night. This is a non-idempotent pipeline, and it's the most common source of silent data corruption in production systems.
Idempotent pipelines can be run any number of times for the same input and always produce the same output. Failures become non-events — you retry, and nothing breaks.
What Idempotency Means in Data Engineering
In mathematics, a function f is idempotent if f(f(x)) = f(x). In data pipelines: running the same pipeline twice for the same time range produces the same result as running it once.
The critical insight: idempotency is about the effect on the target, not the logic of the pipeline. A pipeline can be computationally complex as long as its writes are idempotent.
Why Pipelines Aren't Idempotent by Default
Most naive pipelines fail idempotency in one of three ways:
# WRONG — non-idempotent append
def load_daily_orders(date: str):
df = extract_orders(date)
df.to_sql("orders", engine, if_exists="append") # Retrying = duplicates
# WRONG — non-idempotent aggregation
def aggregate_daily_sales(date: str):
rows = db.execute(f"SELECT SUM(amount) FROM orders WHERE date = '{date}'").fetchone()
db.execute(f"INSERT INTO daily_sales (date, revenue) VALUES ('{date}', {rows[0]})")
# Retrying = duplicate rows in daily_sales
# WRONG — side-effectful writes without deduplication
def send_events_to_kafka(date: str):
events = db.execute(f"SELECT * FROM events WHERE date = '{date}'")
for event in events:
producer.send("events_topic", event) # Retrying = duplicate messages downstream
Core Idempotency Patterns
Pattern 1: Partition Overwrite
The most reliable pattern for batch pipelines on partitioned storage. Write to a temporary location, then atomically replace the entire target partition.
# PySpark — idempotent partition overwrite (Delta Lake / Spark)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("idempotent_load").getOrCreate()
# Set partition overwrite mode to dynamic — only replaces written partitions
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
def load_orders_for_date(processing_date: str):
# Idempotent: re-running for the same date always produces the same result.
df = spark.read.parquet(f"s3://raw/orders/date={processing_date}/")
transformed = df.select(
"order_id", "customer_id", "amount", "processing_date"
)
# Dynamic overwrite: replaces only the partition being written
transformed.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("processing_date") \
.save("s3://datalake/orders/")
# Running this twice for the same date -> same result, no duplicates
Why it works: The second run overwrites the same partition the first run wrote. The result is identical to a single run.
Requirement: Your data must be partitionable by the processing unit (date, hour, batch ID). This is almost always possible for batch pipelines.
Pattern 2: UPSERT / MERGE
For pipelines that update existing rows (slowly changing dimensions, CDC), use MERGE instead of INSERT:
# PySpark — idempotent MERGE using Delta Lake
from delta.tables import DeltaTable
def upsert_customers(new_data_df):
# Idempotent: running twice with the same DataFrame produces the same target state.
delta_table = DeltaTable.forPath(spark, "s3://datalake/customers/")
(delta_table.alias("target")
.merge(
new_data_df.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Second run with same data: MATCH fires, target row is overwritten with same values
# Net effect: no change. Idempotent.
-- Spark SQL / Databricks SQL — MERGE statement
MERGE INTO customers AS target
USING new_customer_data AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Safe to run multiple times for the same source data
Pattern 3: Truncate-and-Reload for Small Tables
For dimension tables or small reference tables where full reloads are practical:
# Python / SQLAlchemy — idempotent truncate-then-insert
from sqlalchemy import create_engine, text
def reload_product_catalog(products: list):
# Idempotent: truncate + reload produces the same state regardless of run count.
# Use only for small reference tables where full reload is acceptable.
engine = create_engine("postgresql://user:pass@host/db")
with engine.begin() as conn: # begin() = automatic commit/rollback
conn.execute(text("TRUNCATE TABLE product_catalog"))
conn.execute(
text("INSERT INTO product_catalog (id, name, category) VALUES (:id, :name, :category)"),
products
)
# Wrapped in a transaction: either both TRUNCATE and INSERT succeed, or neither does
Important: Wrap truncate + insert in a single transaction. If the INSERT fails after TRUNCATE without a rollback, you'll have an empty table.
Pattern 4: Natural Key Deduplication
When you can't control write semantics (e.g., raw data landing in object storage from multiple producers), deduplicate at read time using a natural key:
# PySpark — deduplication on natural key before writing
from pyspark.sql.functions import row_number, desc
from pyspark.sql.window import Window
def deduplicate_and_load(raw_df, natural_keys: list, event_time_col: str):
# Keep only the most recent record per natural key.
# Idempotent: same raw data -> same deduplicated output.
window = Window.partitionBy(*natural_keys).orderBy(desc(event_time_col))
deduped = (
raw_df
.withColumn("_row_num", row_number().over(window))
.filter("_row_num = 1")
.drop("_row_num")
)
return deduped
# Usage
raw = spark.read.parquet("s3://raw/events/date=2024-03-15/")
clean = deduplicate_and_load(raw, natural_keys=["event_id"], event_time_col="ingested_at")
clean.write.format("delta").mode("overwrite").save("s3://datalake/events/")
Pattern 5: Idempotency Keys for External APIs
When pipelines write to external systems (REST APIs, messaging queues), use idempotency keys:
# Python — idempotency key for external API write
import hashlib
import requests
def send_notification(user_id: str, message: str, event_date: str):
# Idempotent: same (user_id, event_date) always sends the same notification,
# even if called multiple times.
# Deterministic idempotency key from stable inputs
idempotency_key = hashlib.sha256(
f"{user_id}:{event_date}:{message}".encode()
).hexdigest()
response = requests.post(
"https://api.notifications.example.com/send",
json={"user_id": user_id, "message": message},
headers={
"Idempotency-Key": idempotency_key,
"Authorization": "Bearer TOKEN"
}
)
if response.status_code == 200:
return "sent"
elif response.status_code == 409:
return "already_sent" # Duplicate detected by the API — safe to ignore
else:
response.raise_for_status()
Not all external APIs support idempotency keys — check the API docs. Stripe, Twilio, and most modern payment APIs support this pattern.
Exactly-Once in Streaming Pipelines
For streaming pipelines, idempotency manifests as exactly-once semantics — each event is processed and written exactly once, even in failure scenarios.
| Guarantee | Description | Achieved By |
|---|---|---|
| At-most-once | Events may be lost, never duplicated | Fire-and-forget (no retries) |
| At-least-once | Events may be duplicated, never lost | Retries + consumer commits after processing |
| Exactly-once | Events processed and written exactly once | Transactional writes + idempotent producers |
# PySpark Structured Streaming — exactly-once to Delta Lake
query = (
stream_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/orders_stream")
# Delta Lake + Spark Structured Streaming = exactly-once by default
# The checkpoint tracks Kafka offsets; Delta Lake uses transaction log
# for atomic writes. Together: exactly-once end-to-end.
.start("s3://datalake/orders_stream/")
)
Spark Structured Streaming + Delta Lake achieves exactly-once end-to-end by combining Kafka offset checkpointing (tracks what was read) with Delta Lake's transactional writes (ensures atomic, idempotent writes).
Pipeline Idempotency Checklist
| Scenario | Pattern |
|---|---|
| Batch write to partitioned storage | Dynamic partition overwrite |
| Updating/inserting rows | MERGE / UPSERT |
| Small dimension/reference tables | Truncate + reload in transaction |
| Raw data with possible duplicates | Natural key deduplication at read time |
| External API writes | Idempotency keys |
| Streaming pipeline | Checkpointing + transactional writes |
| Orchestration retries | Make all tasks idempotent; enable automatic retries safely |
Testing Idempotency
Idempotency is easy to test — run the pipeline twice and compare the results:
# pytest — idempotency test
def test_pipeline_is_idempotent(spark, test_data):
# Running the pipeline twice produces the same result as running it once.
# First run
load_orders_for_date("2024-03-15")
result_after_first_run = spark.read.format("delta").load("s3://test/orders/").count()
# Second run — same input
load_orders_for_date("2024-03-15")
result_after_second_run = spark.read.format("delta").load("s3://test/orders/").count()
assert result_after_first_run == result_after_second_run, \
"Pipeline is not idempotent: row count changed on second run"
Add this test to your pipeline CI. A failing idempotency test is much cheaper than finding duplicates in production.
Common Mistakes
1. Assuming partition overwrite is atomic without checking. In plain Parquet on S3, partition overwrites are NOT atomic — a reader can see a half-written partition during the write window. Use Delta Lake or Iceberg for atomic partition replacements.
2. Using database sequences as primary keys in idempotent loads. Auto-increment keys generate new values on every insert. If you truncate and reload, the keys change. Use natural keys (order_id, customer_id) or deterministic hash keys as primary keys for idempotent tables.
3. Idempotent writes, non-idempotent side effects. Your write to the warehouse is idempotent. But your pipeline also sends a Slack notification and triggers a downstream API call on success. Those side effects need their own idempotency handling.
4. Forgetting that "overwrite" isn't always safe. Overwriting a partition is safe if you own that partition entirely. If multiple pipelines write to the same partition, overwriting will delete other pipelines' data.
If you're running ad-hoc validation queries against your pipeline outputs — checking for duplicates, verifying row counts after a retry, or exploring the output schema — Harbinger Explorer lets you query data files directly in the browser via DuckDB WASM. It's useful for quick idempotency spot-checks without spinning up a full Spark session: just point it at your output files and ask "how many distinct order_ids are there?"
The Bottom Line
Idempotent pipelines are not a nice-to-have — they're the difference between a system that recovers gracefully from failures and one that requires manual cleanup after every incident.
The implementation is almost always simpler than teams expect: dynamic partition overwrite covers most batch cases. MERGE handles incremental updates. Natural key deduplication catches the rest. Build these patterns in from the start, and retrying a failed job becomes a non-event.
Next step: Combine idempotency with good monitoring. Read Data Pipeline Monitoring: What to Track to build the observability layer that tells you when to retry.
Continue Reading
[VERIFY]: Spark Structured Streaming exactly-once guarantees — confirm the specific Delta Lake + checkpoint combination. [VERIFY]: Kafka idempotent producer configuration details for exactly-once.
Continue Reading
Data Deduplication Strategies: Hash, Fuzzy, and Record Linkage
Airflow vs Dagster vs Prefect: An Honest Comparison
An unbiased comparison of Airflow, Dagster, and Prefect — covering architecture, DX, observability, and real trade-offs to help you pick the right orchestrator.
Change Data Capture Explained
A practical guide to CDC patterns — log-based, trigger-based, and polling — with Debezium configuration examples and Kafka Connect integration.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial