Cloud allgemein

Python für Data Engineering: Praxis-Toolkit 2026

Die Python-Libraries, Patterns und Praktiken, die Production-Data-Engineering von Skripten trennen — mit ausführbarem Code für ETL, API-Ingestion und Tests.

Harbinger Team10. April 202611 Min. LesezeitAktualisiert 14.5.2026
  • python
  • data engineering
  • pandas
  • polars
  • pydantic
  • etl
  • sqlalchemy
  • tutorial
  • dach
Inhaltsverzeichnis24 Abschnitte

Python für Data Engineering: Das Toolkit, das du wirklich brauchst

Du kannst Python. Du hast Skripte geschrieben, vielleicht etwas pandas, möglicherweise eine Flask-App. Aber Data-Engineering-Python ist ein anderes Tier — weniger clevere Algorithmen, mehr verlässliches Datenbewegen at Scale, sauberes Failure-Handling und Code, den dein zukünftiges Ich (oder Teamkollegen) um 3 Uhr nachts nicht verflucht.

Dieser Guide deckt das Python-Toolkit, die Patterns und die Praktiken ab, die das Python eines Data Engineers vom Python eines Data Scientists trennen. Keine Theorie-Dumps — nur Tools und Techniken, die du täglich nutzt.

TL;DR

  • pandas für <500 MB ad-hoc, polars für 500 MB–10 GB, PySpark ab 10 GB+.
  • pydantic für Configs, httpx für API-Ingestion, SQLAlchemy für Batch-Loading.
  • uv ersetzt pip+poetry. pyproject.toml ist Standard.
  • Type-Hints, Logging, Tests trennen Production von Skript.

Warum Python im Data Engineering dominiert

Python ist nicht die schnellste Sprache. Nicht die typsicherste. Aber sie gewinnt auf drei Achsen, die im Data Engineering enorm zählen:

StärkeWarum es zählt
Ökosystem-TiefeLibraries für jede Quelle, jedes Format, jeden Cloud-Service
Orchestrator-SupportAirflow, Dagster, Prefect — alle Python-nativ
Spark-IntegrationPySpark ist die meistgenutzte Spark-API
Cloud-SDK-AbdeckungAWS boto3, Azure SDK, GCP Client Libraries — alle First-Class Python
Hiring-PoolDie meisten Data-Profis können schon Python

Der eigentliche Vorteil ist kein Einzelfeature — sondern, dass Python die Lingua franca ist, die jede Schicht des modernen Data-Stacks verbindet.

Essentielle Libraries

Vergiss "Top 50 Python Libraries"-Listicles. Hier ist, was du in Production-Pipelines wirklich importierst:

Daten-Processing

LibraryUse-CaseWann wählen
pandasKlein-mittlere Transforms (<1 GB)Ad-hoc, Prototyping
polarsMittlere Datasets, performance-kritischWenn pandas zu langsam, Spark Overkill
PySparkGroßskaliges verteiltes ProcessingDatasets >10 GB, bestehende Spark-Infrastruktur
DuckDBAnalytisches SQL auf lokalen FilesParquet/CSV-Analyse, schnelle Aggregationen
pyarrowColumnar Data, Parquet-I/OParquet lesen/schreiben, Arrow-Interop

Connectivity und I/O

LibraryZweck
sqlalchemyDB-Connections, ORM, Raw-SQL-Ausführung
requests / httpxREST-API-Ingestion
boto3AWS S3, Glue, Redshift, alles AWS
azure-storage-blobAzure-Blob-Storage-I/O
psycopg2 / asyncpgPostgreSQL-Driver
confluent-kafkaKafka-Producer/Consumer

Pipeline und Qualität

LibraryZweck
great_expectationsDatenqualitäts-Validierung
pydanticSchema-Validierung für Configs und API-Payloads
dbt-coreSQL-basierte Transformationen (auch Python-Models)
delta-rs (deltalake)Delta Lake ohne Spark

Data-Engineering-Python-Projekt aufsetzen

Skipp requirements.txt für alles jenseits eines Wegwerf-Skripts. Moderne Setup:

Projektstruktur

my-pipeline/
+-- pyproject.toml          # Dependencies, Build-Config, Tool-Settings
+-- src/
|   +-- my_pipeline/
|       +-- __init__.py
|       +-- extract.py      # Source-Connectoren
|       +-- transform.py    # Business-Logic
|       +-- load.py         # Sink-Writer
|       +-- models.py       # Pydantic-Schemas
+-- tests/
|   +-- test_extract.py
|   +-- test_transform.py
+-- .env.example
+-- README.md

Dependency-Management mit uv

uv hat 2025–2026 pip und poetry weitgehend ersetzt — schnell und mit korrekter Resolution:

# uv installieren
curl -LsSf https://astral.sh/uv/install.sh | sh

# Neues Projekt initialisieren
uv init my-pipeline
cd my-pipeline

# Dependencies hinzufügen
uv add pandas sqlalchemy psycopg2-binary pydantic
uv add --dev pytest ruff mypy

# Skripte in managed Environment ausführen
uv run python src/my_pipeline/extract.py

# Lock und sync
uv lock
uv sync

Die pyproject.toml ersetzt requirements.txt, setup.py und setup.cfg in einer Datei:

[project]
name = "my-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "pandas>=2.2",
    "sqlalchemy>=2.0",
    "psycopg2-binary>=2.9",
    "pydantic>=2.0",
]

[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "UP"]

Pattern 1: Type-sichere Pipeline-Configs mit Pydantic

Hardcoded Connection-Strings und Magic Numbers killen Pipelines in Production. Pydantic validiert Config beim Startup — fail fast, fail loud:

# models.py
from pydantic import BaseModel, Field, SecretStr
from pydantic_settings import BaseSettings

class DatabaseConfig(BaseSettings):
    host: str = Field(..., alias="DB_HOST")
    port: int = Field(5432, alias="DB_PORT")
    database: str = Field(..., alias="DB_NAME")
    user: str = Field(..., alias="DB_USER")
    password: SecretStr = Field(..., alias="DB_PASSWORD")

    @property
    def connection_string(self) -> str:
        return (
            f"postgresql://{self.user}:"
            f"{self.password.get_secret_value()}"
            f"@{self.host}:{self.port}/{self.database}"
        )

    model_config = {"env_file": ".env"}


class PipelineConfig(BaseSettings):
    db: DatabaseConfig = DatabaseConfig()
    batch_size: int = Field(10_000, alias="BATCH_SIZE")
    max_retries: int = Field(3, alias="MAX_RETRIES")
    source_api_url: str = Field(..., alias="SOURCE_API_URL")

Verwendung — die Pipeline crasht sofort, wenn nötige Env-Vars fehlen:

# extract.py
from my_pipeline.models import PipelineConfig

config = PipelineConfig()
# Wenn DB_HOST oder SOURCE_API_URL nicht gesetzt sind,
# wirft Pydantic einen klaren ValidationError beim Startup

Besser als einen KeyError mitten in einem 2-Stunden-Batch-Job zu entdecken.

Pattern 2: Robuste API-Ingestion mit Retry und Backoff

Jeder Data Engineer schreibt API-Extractor. Die meisten schreiben sie beim ersten Mal schlecht. Production-Grade-Pattern mit httpx und strukturierten Retries:

# extract.py
import httpx
import time
import logging
from typing import Generator

logger = logging.getLogger(__name__)

def fetch_paginated(
    base_url: str,
    headers: dict,
    page_size: int = 100,
    max_retries: int = 3,
) -> Generator[list[dict], None, None]:
    offset = 0

    with httpx.Client(timeout=30.0) as client:
        while True:
            params = {"limit": page_size, "offset": offset}

            for attempt in range(1, max_retries + 1):
                try:
                    resp = client.get(
                        base_url,
                        headers=headers,
                        params=params,
                    )
                    resp.raise_for_status()
                    break
                except httpx.HTTPStatusError as e:
                    if e.response.status_code == 429:
                        wait = int(
                            e.response.headers.get("Retry-After", 2 ** attempt)
                        )
                        logger.warning(f"Rate limited. Warte {wait}s.")
                        time.sleep(wait)
                    elif attempt == max_retries:
                        raise
                    else:
                        time.sleep(2 ** attempt)
                except httpx.RequestError:
                    if attempt == max_retries:
                        raise
                    time.sleep(2 ** attempt)

            data = resp.json()
            records = data.get("results", [])

            if not records:
                break

            yield records
            offset += page_size

            logger.info(f"Bisher {offset} Records geladen.")

Kern: Generator-basiert (konstanter Speicher), respektiert Retry-After-Header, exponentielles Backoff bei transienten Fehlern.

Pattern 3: Effizientes Batch-Loading mit SQLAlchemy

Row-by-row laden ist der größte Performance-Fehler in Python-ETL. Immer batchen:

# load.py -- PostgreSQL-Dialekt
from sqlalchemy import create_engine, text
from typing import Iterator
import logging

logger = logging.getLogger(__name__)

def batch_upsert(
    connection_string: str,
    table: str,
    records: Iterator[list[dict]],
    conflict_key: str = "id",
    batch_size: int = 5_000,
) -> int:
    engine = create_engine(connection_string)
    total = 0

    with engine.begin() as conn:
        for batch in records:
            if not batch:
                continue

            columns = list(batch[0].keys())
            placeholders = ", ".join(f":{col}" for col in columns)
            update_clause = ", ".join(
                f"{col} = EXCLUDED.{col}"
                for col in columns
                if col != conflict_key
            )

            # PostgreSQL INSERT ... ON CONFLICT (upsert)
            stmt = text(
                f"INSERT INTO {table} ({', '.join(columns)}) "
                f"VALUES ({placeholders}) "
                f"ON CONFLICT ({conflict_key}) "
                f"DO UPDATE SET {update_clause}"
            )

            conn.execute(stmt, batch)
            total += len(batch)
            logger.info(f"{total} Records nach {table} upserted.")

    return total

Dieses Pattern verarbeitet Daten in konstantem Speicher, indem es den Generator des Extract-Schritts direkt konsumiert.

Pattern 4: pandas vs polars — wann wechseln

Die meisten Data Engineers starten mit pandas. Das ist okay. Aber zu wissen, wann polars das bessere Tool ist, spart Wartezeit:

# Pandas -- vertraut, gut für schnelle Exploration
# Gut für: <500 MB, gemischte Operationen, breites Ökosystem
import pandas as pd

df = pd.read_parquet("events.parquet")
result = (
    df.groupby("event_type")
    .agg(count=("event_id", "count"), avg_value=("value", "mean"))
    .reset_index()
    .sort_values("count", ascending=False)
)
# Polars -- gleiche Logic, 5-10x schneller auf mittleren Datasets
# Gut für: 500 MB-10 GB, aggregation-lastig, Lazy Evaluation
import polars as pl

result = (
    pl.scan_parquet("events.parquet")  # lazy -- noch keine Daten geladen
    .group_by("event_type")
    .agg(
        pl.col("event_id").count().alias("count"),
        pl.col("value").mean().alias("avg_value"),
    )
    .sort("count", descending=True)
    .collect()  # Ausführung hier
)
Dimensionpandaspolars
Memory-ModellEager — alles im RAMLazy Evaluation mit scan_*
Speed (1 GB Aggregation)~12 s typisch~2 s typisch
MutabilityMutable DataFramesImmutable by Default
API-StyleMethod-Chaining + IndexingExpression-basiert, komponierbar
ÖkosystemRiesig (scikit-learn, plotly etc.)Wachsend, gutes Arrow-Interop
LernkurveNiedriger (mehr Tutorials/SO)Mittel (anderes mentales Modell)

Mein Take: neue Pipelines in polars starten, wenn das Team offen ist. Die Lazy Evaluation allein verhindert eine ganze Klasse von Memory-Bugs. Auf pandas zurückfallen, wenn eine bestimmte Library zwingend pandas-DataFrame als Input braucht.

Pattern 5: Testbarer Pipeline-Code

Daten-Pipelines, die nicht lokal testbar sind, sind Pipelines, die in Production brechen. Trenne I/O von Logic:

# transform.py -- reine Funktionen, kein I/O
import polars as pl

def clean_events(df: pl.DataFrame) -> pl.DataFrame:
    return (
        df.filter(pl.col("event_id").is_not_null())
        .unique(subset=["event_id"])
        .with_columns(
            pl.col("timestamp").str.to_datetime().alias("event_ts"),
            pl.col("value").fill_null(0).alias("value_clean"),
        )
    )

def flag_anomalies(
    df: pl.DataFrame,
    threshold: float = 3.0,
) -> pl.DataFrame:
    stats = df.select(
        pl.col("value_clean").mean().alias("mu"),
        pl.col("value_clean").std().alias("sigma"),
    ).row(0)
    mu, sigma = stats

    return df.with_columns(
        (
            ((pl.col("value_clean") - mu).abs() > threshold * sigma)
        ).alias("is_anomaly")
    )
# tests/test_transform.py
import polars as pl
from my_pipeline.transform import clean_events, flag_anomalies

def test_clean_events_removes_nulls():
    df = pl.DataFrame({
        "event_id": ["a", None, "b", "a"],
        "timestamp": ["2026-01-01", "2026-01-02", "2026-01-03", "2026-01-01"],
        "value": [10.0, 20.0, None, 10.0],
    })
    result = clean_events(df)

    assert result.shape[0] == 2  # null entfernt, Duplikat entfernt
    assert result.filter(pl.col("event_id").is_null()).shape[0] == 0

def test_flag_anomalies_catches_outliers():
    df = pl.DataFrame({
        "value_clean": [10.0, 11.0, 9.0, 10.5, 100.0],
    })
    result = flag_anomalies(df, threshold=2.0)

    anomalies = result.filter(pl.col("is_anomaly"))
    assert anomalies.shape[0] == 1
    assert anomalies["value_clean"][0] == 100.0

Tests laufen mit: uv run pytest tests/ -v

Häufige Fehler und wie du sie vermeidest

1. Ganze Datasets in Memory laden

Falsch:

# Lädt 50 GB in RAM und crasht
df = pd.read_parquet("s3://bucket/huge-table/")

Richtig:

# Partition für Partition verarbeiten
import pyarrow.parquet as pq

dataset = pq.ParquetDataset("s3://bucket/huge-table/")
for batch in dataset.to_batches(batch_size=100_000):
    chunk = pl.from_arrow(batch)
    process(chunk)

2. String-Konkatenation für SQL

Falsch (SQL-Injection-Risiko):

query = f"SELECT * FROM users WHERE name = '{user_input}'"

Richtig:

# SQLAlchemy -- PostgreSQL-Dialekt
from sqlalchemy import text
query = text("SELECT * FROM users WHERE name = :name")
result = conn.execute(query, {"name": user_input})

3. Logging ignorieren

Falsch:

print(f"Processed {count} records")  # In Container-Logs verloren

Richtig:

import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Processed %d records", count)

4. Keine Type-Hints

Data-Engineering-Code wird oft von mehreren Leuten gepflegt. Type-Hints sind Doku, die die IDE erzwingt:

# Ohne Types -- was returnt das? Was ist 'config'?
def extract(config, start_date):
    ...

# Mit Types -- klarer Vertrag
def extract(
    config: PipelineConfig,
    start_date: datetime,
) -> Generator[list[dict], None, None]:
    ...

5. Synchron, wo Async hilft

Beim Ingestieren aus mehreren unabhängigen APIs kann asyncio + httpx Wall-Clock-Zeit drastisch senken:

import asyncio
import httpx

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=30.0) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return [
            r.json() for r in responses
            if isinstance(r, httpx.Response) and r.status_code == 200
        ]

# Verwendung
results = asyncio.run(fetch_all(api_endpoints))

Python-Performance-Tipps für Pipelines

TechnikImpactWann nutzen
Generators statt ListenKonstanter SpeicherAlles Paginierte oder Batchbare
polars Lazy-Mode5-10x schneller als eager pandasAggregationen auf Files >100 MB
PyArrow für I/O3-5x schnellere Parquet-ReadsJeder Parquet-Read/Write
orjson statt json3-10x schnelleres JSON-ParsingHigh-Volume API-Ingestion
Connection-PoolingEliminiert Verbindungs-OverheadJede DB-lastige Pipeline
multiprocessing.PoolAlle CPU-Cores nutzenCPU-bound Transforms (nicht I/O)

Alles zusammen: Mini-Pipeline

So verbinden sich die Patterns in einem echten ETL-Flow:

# main.py -- Orchestrierungs-Entrypoint
import logging
from my_pipeline.models import PipelineConfig
from my_pipeline.extract import fetch_paginated
from my_pipeline.transform import clean_events, flag_anomalies
from my_pipeline.load import batch_upsert
import polars as pl

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run():
    config = PipelineConfig()  # validiert Env-Vars oder crasht

    # Extract: paginierter API-Fetch (Generator)
    raw_pages = fetch_paginated(
        base_url=config.source_api_url,
        headers={"Authorization": f"Bearer {config.api_token}"},
        page_size=config.batch_size,
    )

    # Transform: jede Seite verarbeiten, gereinigte Batches yielden
    def transform_stream(pages):
        for page in pages:
            df = pl.DataFrame(page)
            df = clean_events(df)
            df = flag_anomalies(df)
            yield df.to_dicts()

    # Load: Upsert in PostgreSQL
    total = batch_upsert(
        connection_string=config.db.connection_string,
        table="events",
        records=transform_stream(raw_pages),
        conflict_key="event_id",
    )

    logger.info(f"Pipeline fertig. {total} Records upserted.")

if __name__ == "__main__":
    run()

Die ganze Pipeline läuft in konstantem Speicher unabhängig von der Source-Größe, validiert Config vor jedem Daten-Touch, jede Komponente ist unabhängig testbar.

FAQ

pandas oder polars für neue Projekte? polars für alles >500 MB oder wenn Aggregationen die Hauptlast sind. pandas für Prototyping und wenn du Libraries brauchst, die pandas erwarten (scikit-learn etc.).

Brauche ich Spark, wenn ich polars habe? Polars läuft auf einer Maschine. Sobald Datasets >100 GB werden oder du mehrere Maschinen brauchst, ist Spark Pflicht. Bis dahin ist polars meist genug.

uv oder poetry? uv ist 2026 die Default-Empfehlung — 10–100x schneller als poetry, gleiche Funktionalität.

Python 3.11 oder 3.12? 3.12 ist stabil und schneller. 3.13 mit Free-Threading kommt 2026 voll, ist aber für DE noch experimentell.

Wie testet man Pipelines, die auf Cloud-Services hängen? Mock externe Services in Unit-Tests (moto für AWS, fakes für DB). Integration-Tests gegen Sandbox-Konten. Lokale Reproduzierbarkeit via Docker-Compose mit Postgres/Kafka.

Weiterlesen

Wenn du Pipeline-Logic schnell prototypen willst — SQL gegen CSVs und Parquet, API-Responses interaktiv testen — lässt dich Harbinger Explorer das direkt im Browser mit DuckDB WASM machen. Nützlich, um Transforms zu validieren, bevor du sie in Production-Code committest.

Der beste Python-Data-Engineering-Code ist nicht clever. Er ist langweilig, vorhersehbar, gut typisiert und handhabt Failures, bevor sie zu Incidents werden. Schreib Code, der dich nachts schlafen lässt.

Stand: 14. Mai 2026.

H

Geschrieben von

Harbinger Team

Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastruktur­kritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.

Hat dir das geholfen?

Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.

Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.