Inhaltsverzeichnis24 Abschnitte
- TL;DR
- Warum Python im Data Engineering dominiert
- Essentielle Libraries
- Daten-Processing
- Connectivity und I/O
- Pipeline und Qualität
- Data-Engineering-Python-Projekt aufsetzen
- Projektstruktur
- Dependency-Management mit uv
- Pattern 1: Type-sichere Pipeline-Configs mit Pydantic
- Pattern 2: Robuste API-Ingestion mit Retry und Backoff
- Pattern 3: Effizientes Batch-Loading mit SQLAlchemy
- Pattern 4: pandas vs polars — wann wechseln
- Pattern 5: Testbarer Pipeline-Code
- Häufige Fehler und wie du sie vermeidest
- 1. Ganze Datasets in Memory laden
- 2. String-Konkatenation für SQL
- 3. Logging ignorieren
- 4. Keine Type-Hints
- 5. Synchron, wo Async hilft
- Python-Performance-Tipps für Pipelines
- Alles zusammen: Mini-Pipeline
- FAQ
- Weiterlesen
Python für Data Engineering: Das Toolkit, das du wirklich brauchst
Du kannst Python. Du hast Skripte geschrieben, vielleicht etwas pandas, möglicherweise eine Flask-App. Aber Data-Engineering-Python ist ein anderes Tier — weniger clevere Algorithmen, mehr verlässliches Datenbewegen at Scale, sauberes Failure-Handling und Code, den dein zukünftiges Ich (oder Teamkollegen) um 3 Uhr nachts nicht verflucht.
Dieser Guide deckt das Python-Toolkit, die Patterns und die Praktiken ab, die das Python eines Data Engineers vom Python eines Data Scientists trennen. Keine Theorie-Dumps — nur Tools und Techniken, die du täglich nutzt.
TL;DR
- pandas für <500 MB ad-hoc, polars für 500 MB–10 GB, PySpark ab 10 GB+.
- pydantic für Configs, httpx für API-Ingestion, SQLAlchemy für Batch-Loading.
- uv ersetzt pip+poetry. pyproject.toml ist Standard.
- Type-Hints, Logging, Tests trennen Production von Skript.
Warum Python im Data Engineering dominiert
Python ist nicht die schnellste Sprache. Nicht die typsicherste. Aber sie gewinnt auf drei Achsen, die im Data Engineering enorm zählen:
| Stärke | Warum es zählt |
|---|---|
| Ökosystem-Tiefe | Libraries für jede Quelle, jedes Format, jeden Cloud-Service |
| Orchestrator-Support | Airflow, Dagster, Prefect — alle Python-nativ |
| Spark-Integration | PySpark ist die meistgenutzte Spark-API |
| Cloud-SDK-Abdeckung | AWS boto3, Azure SDK, GCP Client Libraries — alle First-Class Python |
| Hiring-Pool | Die meisten Data-Profis können schon Python |
Der eigentliche Vorteil ist kein Einzelfeature — sondern, dass Python die Lingua franca ist, die jede Schicht des modernen Data-Stacks verbindet.
Essentielle Libraries
Vergiss "Top 50 Python Libraries"-Listicles. Hier ist, was du in Production-Pipelines wirklich importierst:
Daten-Processing
| Library | Use-Case | Wann wählen |
|---|---|---|
| pandas | Klein-mittlere Transforms (<1 GB) | Ad-hoc, Prototyping |
| polars | Mittlere Datasets, performance-kritisch | Wenn pandas zu langsam, Spark Overkill |
| PySpark | Großskaliges verteiltes Processing | Datasets >10 GB, bestehende Spark-Infrastruktur |
| DuckDB | Analytisches SQL auf lokalen Files | Parquet/CSV-Analyse, schnelle Aggregationen |
| pyarrow | Columnar Data, Parquet-I/O | Parquet lesen/schreiben, Arrow-Interop |
Connectivity und I/O
| Library | Zweck |
|---|---|
| sqlalchemy | DB-Connections, ORM, Raw-SQL-Ausführung |
| requests / httpx | REST-API-Ingestion |
| boto3 | AWS S3, Glue, Redshift, alles AWS |
| azure-storage-blob | Azure-Blob-Storage-I/O |
| psycopg2 / asyncpg | PostgreSQL-Driver |
| confluent-kafka | Kafka-Producer/Consumer |
Pipeline und Qualität
| Library | Zweck |
|---|---|
| great_expectations | Datenqualitäts-Validierung |
| pydantic | Schema-Validierung für Configs und API-Payloads |
| dbt-core | SQL-basierte Transformationen (auch Python-Models) |
| delta-rs (deltalake) | Delta Lake ohne Spark |
Data-Engineering-Python-Projekt aufsetzen
Skipp requirements.txt für alles jenseits eines Wegwerf-Skripts. Moderne Setup:
Projektstruktur
my-pipeline/
+-- pyproject.toml # Dependencies, Build-Config, Tool-Settings
+-- src/
| +-- my_pipeline/
| +-- __init__.py
| +-- extract.py # Source-Connectoren
| +-- transform.py # Business-Logic
| +-- load.py # Sink-Writer
| +-- models.py # Pydantic-Schemas
+-- tests/
| +-- test_extract.py
| +-- test_transform.py
+-- .env.example
+-- README.md
Dependency-Management mit uv
uv hat 2025–2026 pip und poetry weitgehend ersetzt — schnell und mit korrekter Resolution:
# uv installieren
curl -LsSf https://astral.sh/uv/install.sh | sh
# Neues Projekt initialisieren
uv init my-pipeline
cd my-pipeline
# Dependencies hinzufügen
uv add pandas sqlalchemy psycopg2-binary pydantic
uv add --dev pytest ruff mypy
# Skripte in managed Environment ausführen
uv run python src/my_pipeline/extract.py
# Lock und sync
uv lock
uv sync
Die pyproject.toml ersetzt requirements.txt, setup.py und setup.cfg in einer Datei:
[project]
name = "my-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"pandas>=2.2",
"sqlalchemy>=2.0",
"psycopg2-binary>=2.9",
"pydantic>=2.0",
]
[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "UP"]
Pattern 1: Type-sichere Pipeline-Configs mit Pydantic
Hardcoded Connection-Strings und Magic Numbers killen Pipelines in Production. Pydantic validiert Config beim Startup — fail fast, fail loud:
# models.py
from pydantic import BaseModel, Field, SecretStr
from pydantic_settings import BaseSettings
class DatabaseConfig(BaseSettings):
host: str = Field(..., alias="DB_HOST")
port: int = Field(5432, alias="DB_PORT")
database: str = Field(..., alias="DB_NAME")
user: str = Field(..., alias="DB_USER")
password: SecretStr = Field(..., alias="DB_PASSWORD")
@property
def connection_string(self) -> str:
return (
f"postgresql://{self.user}:"
f"{self.password.get_secret_value()}"
f"@{self.host}:{self.port}/{self.database}"
)
model_config = {"env_file": ".env"}
class PipelineConfig(BaseSettings):
db: DatabaseConfig = DatabaseConfig()
batch_size: int = Field(10_000, alias="BATCH_SIZE")
max_retries: int = Field(3, alias="MAX_RETRIES")
source_api_url: str = Field(..., alias="SOURCE_API_URL")
Verwendung — die Pipeline crasht sofort, wenn nötige Env-Vars fehlen:
# extract.py
from my_pipeline.models import PipelineConfig
config = PipelineConfig()
# Wenn DB_HOST oder SOURCE_API_URL nicht gesetzt sind,
# wirft Pydantic einen klaren ValidationError beim Startup
Besser als einen KeyError mitten in einem 2-Stunden-Batch-Job zu entdecken.
Pattern 2: Robuste API-Ingestion mit Retry und Backoff
Jeder Data Engineer schreibt API-Extractor. Die meisten schreiben sie beim ersten Mal schlecht. Production-Grade-Pattern mit httpx und strukturierten Retries:
# extract.py
import httpx
import time
import logging
from typing import Generator
logger = logging.getLogger(__name__)
def fetch_paginated(
base_url: str,
headers: dict,
page_size: int = 100,
max_retries: int = 3,
) -> Generator[list[dict], None, None]:
offset = 0
with httpx.Client(timeout=30.0) as client:
while True:
params = {"limit": page_size, "offset": offset}
for attempt in range(1, max_retries + 1):
try:
resp = client.get(
base_url,
headers=headers,
params=params,
)
resp.raise_for_status()
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
wait = int(
e.response.headers.get("Retry-After", 2 ** attempt)
)
logger.warning(f"Rate limited. Warte {wait}s.")
time.sleep(wait)
elif attempt == max_retries:
raise
else:
time.sleep(2 ** attempt)
except httpx.RequestError:
if attempt == max_retries:
raise
time.sleep(2 ** attempt)
data = resp.json()
records = data.get("results", [])
if not records:
break
yield records
offset += page_size
logger.info(f"Bisher {offset} Records geladen.")
Kern: Generator-basiert (konstanter Speicher), respektiert Retry-After-Header, exponentielles Backoff bei transienten Fehlern.
Pattern 3: Effizientes Batch-Loading mit SQLAlchemy
Row-by-row laden ist der größte Performance-Fehler in Python-ETL. Immer batchen:
# load.py -- PostgreSQL-Dialekt
from sqlalchemy import create_engine, text
from typing import Iterator
import logging
logger = logging.getLogger(__name__)
def batch_upsert(
connection_string: str,
table: str,
records: Iterator[list[dict]],
conflict_key: str = "id",
batch_size: int = 5_000,
) -> int:
engine = create_engine(connection_string)
total = 0
with engine.begin() as conn:
for batch in records:
if not batch:
continue
columns = list(batch[0].keys())
placeholders = ", ".join(f":{col}" for col in columns)
update_clause = ", ".join(
f"{col} = EXCLUDED.{col}"
for col in columns
if col != conflict_key
)
# PostgreSQL INSERT ... ON CONFLICT (upsert)
stmt = text(
f"INSERT INTO {table} ({', '.join(columns)}) "
f"VALUES ({placeholders}) "
f"ON CONFLICT ({conflict_key}) "
f"DO UPDATE SET {update_clause}"
)
conn.execute(stmt, batch)
total += len(batch)
logger.info(f"{total} Records nach {table} upserted.")
return total
Dieses Pattern verarbeitet Daten in konstantem Speicher, indem es den Generator des Extract-Schritts direkt konsumiert.
Pattern 4: pandas vs polars — wann wechseln
Die meisten Data Engineers starten mit pandas. Das ist okay. Aber zu wissen, wann polars das bessere Tool ist, spart Wartezeit:
# Pandas -- vertraut, gut für schnelle Exploration
# Gut für: <500 MB, gemischte Operationen, breites Ökosystem
import pandas as pd
df = pd.read_parquet("events.parquet")
result = (
df.groupby("event_type")
.agg(count=("event_id", "count"), avg_value=("value", "mean"))
.reset_index()
.sort_values("count", ascending=False)
)
# Polars -- gleiche Logic, 5-10x schneller auf mittleren Datasets
# Gut für: 500 MB-10 GB, aggregation-lastig, Lazy Evaluation
import polars as pl
result = (
pl.scan_parquet("events.parquet") # lazy -- noch keine Daten geladen
.group_by("event_type")
.agg(
pl.col("event_id").count().alias("count"),
pl.col("value").mean().alias("avg_value"),
)
.sort("count", descending=True)
.collect() # Ausführung hier
)
| Dimension | pandas | polars |
|---|---|---|
| Memory-Modell | Eager — alles im RAM | Lazy Evaluation mit scan_* |
| Speed (1 GB Aggregation) | ~12 s typisch | ~2 s typisch |
| Mutability | Mutable DataFrames | Immutable by Default |
| API-Style | Method-Chaining + Indexing | Expression-basiert, komponierbar |
| Ökosystem | Riesig (scikit-learn, plotly etc.) | Wachsend, gutes Arrow-Interop |
| Lernkurve | Niedriger (mehr Tutorials/SO) | Mittel (anderes mentales Modell) |
Mein Take: neue Pipelines in polars starten, wenn das Team offen ist. Die Lazy Evaluation allein verhindert eine ganze Klasse von Memory-Bugs. Auf pandas zurückfallen, wenn eine bestimmte Library zwingend pandas-DataFrame als Input braucht.
Pattern 5: Testbarer Pipeline-Code
Daten-Pipelines, die nicht lokal testbar sind, sind Pipelines, die in Production brechen. Trenne I/O von Logic:
# transform.py -- reine Funktionen, kein I/O
import polars as pl
def clean_events(df: pl.DataFrame) -> pl.DataFrame:
return (
df.filter(pl.col("event_id").is_not_null())
.unique(subset=["event_id"])
.with_columns(
pl.col("timestamp").str.to_datetime().alias("event_ts"),
pl.col("value").fill_null(0).alias("value_clean"),
)
)
def flag_anomalies(
df: pl.DataFrame,
threshold: float = 3.0,
) -> pl.DataFrame:
stats = df.select(
pl.col("value_clean").mean().alias("mu"),
pl.col("value_clean").std().alias("sigma"),
).row(0)
mu, sigma = stats
return df.with_columns(
(
((pl.col("value_clean") - mu).abs() > threshold * sigma)
).alias("is_anomaly")
)
# tests/test_transform.py
import polars as pl
from my_pipeline.transform import clean_events, flag_anomalies
def test_clean_events_removes_nulls():
df = pl.DataFrame({
"event_id": ["a", None, "b", "a"],
"timestamp": ["2026-01-01", "2026-01-02", "2026-01-03", "2026-01-01"],
"value": [10.0, 20.0, None, 10.0],
})
result = clean_events(df)
assert result.shape[0] == 2 # null entfernt, Duplikat entfernt
assert result.filter(pl.col("event_id").is_null()).shape[0] == 0
def test_flag_anomalies_catches_outliers():
df = pl.DataFrame({
"value_clean": [10.0, 11.0, 9.0, 10.5, 100.0],
})
result = flag_anomalies(df, threshold=2.0)
anomalies = result.filter(pl.col("is_anomaly"))
assert anomalies.shape[0] == 1
assert anomalies["value_clean"][0] == 100.0
Tests laufen mit: uv run pytest tests/ -v
Häufige Fehler und wie du sie vermeidest
1. Ganze Datasets in Memory laden
Falsch:
# Lädt 50 GB in RAM und crasht
df = pd.read_parquet("s3://bucket/huge-table/")
Richtig:
# Partition für Partition verarbeiten
import pyarrow.parquet as pq
dataset = pq.ParquetDataset("s3://bucket/huge-table/")
for batch in dataset.to_batches(batch_size=100_000):
chunk = pl.from_arrow(batch)
process(chunk)
2. String-Konkatenation für SQL
Falsch (SQL-Injection-Risiko):
query = f"SELECT * FROM users WHERE name = '{user_input}'"
Richtig:
# SQLAlchemy -- PostgreSQL-Dialekt
from sqlalchemy import text
query = text("SELECT * FROM users WHERE name = :name")
result = conn.execute(query, {"name": user_input})
3. Logging ignorieren
Falsch:
print(f"Processed {count} records") # In Container-Logs verloren
Richtig:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Processed %d records", count)
4. Keine Type-Hints
Data-Engineering-Code wird oft von mehreren Leuten gepflegt. Type-Hints sind Doku, die die IDE erzwingt:
# Ohne Types -- was returnt das? Was ist 'config'?
def extract(config, start_date):
...
# Mit Types -- klarer Vertrag
def extract(
config: PipelineConfig,
start_date: datetime,
) -> Generator[list[dict], None, None]:
...
5. Synchron, wo Async hilft
Beim Ingestieren aus mehreren unabhängigen APIs kann asyncio + httpx Wall-Clock-Zeit drastisch senken:
import asyncio
import httpx
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return [
r.json() for r in responses
if isinstance(r, httpx.Response) and r.status_code == 200
]
# Verwendung
results = asyncio.run(fetch_all(api_endpoints))
Python-Performance-Tipps für Pipelines
| Technik | Impact | Wann nutzen |
|---|---|---|
| Generators statt Listen | Konstanter Speicher | Alles Paginierte oder Batchbare |
| polars Lazy-Mode | 5-10x schneller als eager pandas | Aggregationen auf Files >100 MB |
| PyArrow für I/O | 3-5x schnellere Parquet-Reads | Jeder Parquet-Read/Write |
orjson statt json | 3-10x schnelleres JSON-Parsing | High-Volume API-Ingestion |
| Connection-Pooling | Eliminiert Verbindungs-Overhead | Jede DB-lastige Pipeline |
multiprocessing.Pool | Alle CPU-Cores nutzen | CPU-bound Transforms (nicht I/O) |
Alles zusammen: Mini-Pipeline
So verbinden sich die Patterns in einem echten ETL-Flow:
# main.py -- Orchestrierungs-Entrypoint
import logging
from my_pipeline.models import PipelineConfig
from my_pipeline.extract import fetch_paginated
from my_pipeline.transform import clean_events, flag_anomalies
from my_pipeline.load import batch_upsert
import polars as pl
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run():
config = PipelineConfig() # validiert Env-Vars oder crasht
# Extract: paginierter API-Fetch (Generator)
raw_pages = fetch_paginated(
base_url=config.source_api_url,
headers={"Authorization": f"Bearer {config.api_token}"},
page_size=config.batch_size,
)
# Transform: jede Seite verarbeiten, gereinigte Batches yielden
def transform_stream(pages):
for page in pages:
df = pl.DataFrame(page)
df = clean_events(df)
df = flag_anomalies(df)
yield df.to_dicts()
# Load: Upsert in PostgreSQL
total = batch_upsert(
connection_string=config.db.connection_string,
table="events",
records=transform_stream(raw_pages),
conflict_key="event_id",
)
logger.info(f"Pipeline fertig. {total} Records upserted.")
if __name__ == "__main__":
run()
Die ganze Pipeline läuft in konstantem Speicher unabhängig von der Source-Größe, validiert Config vor jedem Daten-Touch, jede Komponente ist unabhängig testbar.
FAQ
pandas oder polars für neue Projekte? polars für alles >500 MB oder wenn Aggregationen die Hauptlast sind. pandas für Prototyping und wenn du Libraries brauchst, die pandas erwarten (scikit-learn etc.).
Brauche ich Spark, wenn ich polars habe? Polars läuft auf einer Maschine. Sobald Datasets >100 GB werden oder du mehrere Maschinen brauchst, ist Spark Pflicht. Bis dahin ist polars meist genug.
uv oder poetry? uv ist 2026 die Default-Empfehlung — 10–100x schneller als poetry, gleiche Funktionalität.
Python 3.11 oder 3.12? 3.12 ist stabil und schneller. 3.13 mit Free-Threading kommt 2026 voll, ist aber für DE noch experimentell.
Wie testet man Pipelines, die auf Cloud-Services hängen? Mock externe Services in Unit-Tests (moto für AWS, fakes für DB). Integration-Tests gegen Sandbox-Konten. Lokale Reproduzierbarkeit via Docker-Compose mit Postgres/Kafka.
Weiterlesen
- Apache Airflow Tutorial — orchestriere deine Python-Pipelines mit Dependency-Management
- Data Quality Testing — Great Expectations für Pipeline-Output-Validation
- Data Pipeline Monitoring — sorge dafür, dass Pipelines dir sagen, wenn etwas bricht
Wenn du Pipeline-Logic schnell prototypen willst — SQL gegen CSVs und Parquet, API-Responses interaktiv testen — lässt dich Harbinger Explorer das direkt im Browser mit DuckDB WASM machen. Nützlich, um Transforms zu validieren, bevor du sie in Production-Code committest.
Der beste Python-Data-Engineering-Code ist nicht clever. Er ist langweilig, vorhersehbar, gut typisiert und handhabt Failures, bevor sie zu Incidents werden. Schreib Code, der dich nachts schlafen lässt.
Stand: 14. Mai 2026.
Geschrieben von
Harbinger Team
Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastrukturkritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.
Hat dir das geholfen?
Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.
Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.