Cloud allgemein

REST API Data Pipeline in Python: Production-Guide (2026)

Step-by-step Production-Grade REST-API-Daten-Pipeline in Python: Auth, Pagination, Rate Limits, Schema-Validierung — mit echtem ausführbarem Code.

Harbinger Team8. März 20267 Min. LesezeitAktualisiert 14.5.2026
  • rest api
  • data pipeline
  • python
  • api ingestion
  • etl
  • pagination
  • data engineering
  • dach
Inhaltsverzeichnis15 Abschnitte

REST API Data Pipeline in Python bauen

Du bist gleich dabei, das erste Mal Daten aus einer REST-API zu ziehen, und es läuft — bis es nicht mehr läuft. Rate Limits greifen, das Next-Page-Token ist nicht da, wo die Docs es sagten, und plötzlich ist ein Feld, das immer ein String war, manchmal null. Eine robuste REST-API-Data-Pipeline zu bauen heißt, all das vorne zu planen — nicht um 2 Uhr nachts im On-Call zu entdecken. Dieser Guide führt durch die volle Pipeline: vom ersten HTTP-Request zu strukturiertem, schema-validiertem Output.

TL;DR

  • Session statt bare requests.get, Retry mit exponential Backoff und Jitter
  • Pagination als Generator (Cursor, Offset oder Page-Number)
  • Schema-Validierung beim Ingestieren, nicht downstream
  • Always-on Logging, Skip-Rate als Schema-Drift-Indikator

Architektur-Überblick

Auth-Manager handhabt Token-Lifecycle — Refresh-Tokens vor Ablauf, nicht nur beim Startup. Eine Zwei-Stunden-Pipeline, die nach 1h einen 401 kriegt, verschwendet den ganzen Run. ② Rate-Limit-Handler umhüllt jeden Request mit Retry-Logic und exponentiellem Backoff. Nicht optional für Production. ③ Pagination-Loop fetched Seiten, bis kein Next-Cursor mehr da ist — kenne das Pagination-Pattern deiner API, bevor du eine Zeile Code schreibst. ④ Schema-Validator fängt Schema-Drift früh — fail loud und spezifisch, statt downstream Tables still zu korrumpieren.

Schritt 1: HTTP-Session aufsetzen

Nutze keine bare requests.get()-Calls für Production. Ein Session-Objekt nutzen — es wiederverwendet TCP-Verbindungen, lässt dich Default-Headers einmal setzen und zentralisiert Auth-Management.

import requests
import time
import logging
from typing import Optional, Generator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def build_session(api_key: str) -> requests.Session:
    """Wiederverwendbare Session mit Auth-Headern und sinnvollen Defaults."""
    session = requests.Session()
    session.headers.update({
        "Authorization": f"Bearer {api_key}",
        "Accept": "application/json",
        "Content-Type": "application/json",
    })
    return session

Immer timeout auf jedem Request setzen — nie auf Default lassen (ist unendlich). Ein hängender API-Call blockiert deine ganze Pipeline.

Schritt 2: Retry-Logic mit exponentiellem Backoff

Rate Limits und transiente Fehler passieren bei Scale garantiert. Bau Retry-Logic vor du sie brauchst.

import random
from requests.exceptions import ConnectionError, Timeout


def fetch_with_retry(
    session: requests.Session,
    url: str,
    params: Optional[dict] = None,
    max_retries: int = 5,
    base_delay: float = 1.0,
) -> dict:
    """
    URL mit exponential Backoff + Jitter fetchen.
    Respektiert Retry-After-Header bei 429-Responses.
    Wirft RuntimeError nach max_retries fehlgeschlagenen Versuchen.
    """
    for attempt in range(max_retries):
        try:
            response = session.get(url, params=params, timeout=30)

            # Explizites 429-Handling: Retry-After respektieren
            if response.status_code == 429:
                retry_after = int(
                    response.headers.get("Retry-After", base_delay * (2 ** attempt))
                )
                logger.warning(
                    f"Rate limited. Warte {retry_after}s "
                    f"(Versuch {attempt + 1}/{max_retries})"
                )
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response.json()

        except (ConnectionError, Timeout) as e:
            if attempt == max_retries - 1:
                raise
            # Jitter verhindert Thundering Herd bei parallelen Retries
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Request fehlgeschlagen ({e}). Retry in {delay:.1f}s...")
            time.sleep(delay)

    raise RuntimeError(f"Konnte {url} nach {max_retries} Versuchen nicht fetchen")

Der Jitter (random.uniform(0, 1)) ist besonders wichtig, wenn mehrere Worker parallel retryen — ohne ihn retryen alle gleichzeitig und compounden die Last.

Schritt 3: Pagination handhaben

APIs nutzen verschiedene Pagination-Patterns. Die drei häufigsten: offset/limit, cursor-based und page-number. Generator für cursor-based — am häufigsten in modernen REST-APIs.

def paginate_cursor(
    session: requests.Session,
    base_url: str,
    params: Optional[dict] = None,
    cursor_key: str = "next_cursor",
    data_key: str = "data",
    page_size: int = 100,
) -> Generator[list, None, None]:
    """
    Cursor-based API-Endpoint paginieren.
    Yieldet eine Seite Records pro Mal.

    Args:
        cursor_key: JSON-Key für den Next-Page-Cursor in der Response
        data_key:   JSON-Key mit der Liste Records pro Seite
        page_size:  Records pro Request (an API-Maximum anpassen)
    """
    params = {**(params or {}), "limit": page_size}

    while True:
        response = fetch_with_retry(session, base_url, params=params)

        records = response.get(data_key, [])
        if not records:
            logger.info("Keine Records zurück — Pagination fertig.")
            break

        yield records
        logger.info(f"Seite mit {len(records)} Records geladen")

        next_cursor = response.get(cursor_key)
        if not next_cursor:
            break  # Keine weiteren Seiten

        params["cursor"] = next_cursor
        # URL bleibt; nur Cursor-Param ändert sich

Wenn deine API offset/limit nutzt: offset += len(records) tracken und stoppen, wenn len(records) < page_size. Bei page-numbers: page inkrementieren und stoppen, wenn die Response die letzte Seite anzeigt.

Schritt 4: Responses validieren und flatten

APIs ändern sich ohne Vorwarnung. Felder werden hinzugefügt, entfernt oder ändern still den Typ. Schema-Drift beim Ingestieren zu fangen, ist viel billiger als es in einer Downstream-Transformation zu debuggen.

from dataclasses import dataclass, field
from typing import List


@dataclass
class ApiRecord:
    """
    Validierte und geflättete Record-Struktur.
    Felder an dein API-Schema anpassen.
    """
    id: str
    name: str
    created_at: str
    value: Optional[float] = None
    tags: List[str] = field(default_factory=list)


def validate_record(raw: dict) -> Optional[ApiRecord]:
    """
    Raw-API-Record gegen erwartetes Schema validieren.
    Gibt None (mit Warning) für invalide Records zurück,
    statt den ganzen Run zu crashen.
    """
    required_fields = {"id", "name", "created_at"}
    missing = required_fields - raw.keys()

    if missing:
        logger.warning(
            f"Record {raw.get('id', '[no id]')} fehlt Felder: {missing}"
        )
        return None

    return ApiRecord(
        id=str(raw["id"]),
        name=str(raw["name"]),
        created_at=str(raw["created_at"]),
        value=float(raw["value"]) if raw.get("value") is not None else None,
        tags=raw.get("tags", []),
    )

Skip-Rate tracken: wenn mehr als ein kleiner Prozentsatz der Records die Validierung nicht schafft, hat sich was am API-Schema geändert und du willst es wissen, bevor schlechte Daten propagieren.

Schritt 5: Volle Pipeline zusammenbauen

import json


def run_pipeline(api_key: str, base_url: str, output_path: str) -> None:
    """Volle Ingestion-Pipeline: authentifizieren → paginieren → validieren → schreiben."""
    session = build_session(api_key)
    valid_records = []
    skipped = 0

    for page in paginate_cursor(session, base_url):
        for raw in page:
            record = validate_record(raw)
            if record:
                valid_records.append(record.__dict__)
            else:
                skipped += 1

    skip_rate = skipped / max(len(valid_records) + skipped, 1)
    logger.info(
        f"Pipeline fertig: {len(valid_records)} valide, "
        f"{skipped} übersprungen ({skip_rate:.1%})"
    )

    if skip_rate > 0.05:  # Alert bei >5% Validation-Fail
        logger.error("Skip-Rate über 5% — Schema-Changes upstream prüfen")

    with open(output_path, "w") as f:
        json.dump(valid_records, f, indent=2, default=str)


if __name__ == "__main__":
    run_pipeline(
        api_key="your-api-key",
        base_url="https://api.example.com/v1/records",
        output_path="output/records.json",
    )

Häufige Stolperfallen

Rate Limits

Der naive Ansatz wartet auf 429, bevor er langsamer wird. Besser: X-RateLimit-Remaining-Header proaktiv lesen und throttlen, bevor du Null triffst. Manche APIs haben gestaffelte Limits — pro Sekunde UND pro Tag — und unterschiedliche Fehlermeldungen pro Limit. Docs genau lesen; viele Rate-Limit-Bugs zeigen sich nicht in Development, weil Testvolumen niedrig sind.

Authentifizierung

OAuth-Tokens laufen ab. Wenn deine Pipeline 2+ Stunden läuft und mittendrin einen 401 kriegt, ist der ganze Run weg. Token-Refresh-Check vor jedem Request-Batch — nicht nur beim Startup. Credentials in Env-Vars oder einem Secrets-Manager speichern, nie im Code oder committeten Configs.

Schema-Drift

API-Anbieter fügen Felder ohne Warnung hinzu, ändern Typen still oder nesten Daten zwischen Versionen anders. Dein Validator sollte unerwartete Shapes loggen und überspringen, statt zu crashen — aber laut alerten. Wenn 10% deiner Records plötzlich invalide sind, ist das ein Breaking Change upstream, kein Datenqualitäts-Quirk.

Pagination-Edge-Cases

Zwei häufige Gotchas: manche APIs liefern eine leere Seite mit validem Next-Cursor auf der letzten echten Seite — prüfe beide: records == [] UND next_cursor is None. Und trau total_count nicht für Loop-Termination; manche APIs liefern Schätzwerte. Paginiere bis natürlich erschöpft, statt zum Ziel zu zählen.

Wann über manuelles Ingestieren hinausgehen

Für Einmal-Skripte oder eine Handvoll Quellen reicht dieser Code. Aber bei 20+ APIs mit Credential-Rotation, Schema-Versionierung und Failure-Monitoring werden manuelle Skripte erhebliche Pflege-Last. Tools, die Crawl-and-Catalog automatisieren, zahlen sich bei dem Scale aus.

Harbinger Explorer enthält ein API-Crawling-Feature mit geführtem Setup-Wizard: zeig auf einen Endpoint, konfiguriere Auth, und es handhabt Pagination, Scheduling und Schema-Tracking automatisch. Nützlich, wenn das Ziel Analyse ist, nicht Pipeline-Pflege.

FAQ

Wie unterscheidet sich httpx von requests? httpx ist API-kompatibler Nachfolger mit async Support und HTTP/2. Für Production empfohlen, bei einfachen Skripten reicht requests.

Brauche ich asyncio für API-Pipelines? Bei vielen unabhängigen Quellen lohnt es sich (paralleles Fetchen). Bei einer sequenziellen Pagination eher nicht.

Wie testet man eine Pipeline lokal? Responses mocken (pytest + respx oder responses-Lib). Sample-Payloads als Fixtures speichern. Tests laufen ohne echte API-Calls.

Wie monitort man Production-Pipelines? Strukturiertes Logging (JSON) in einen Log-Aggregator (Datadog, Grafana Loki). Metriken zu Records, Skip-Rate, Duration tracken. Alerts bei Skip-Rate > 5% oder Duration-Spikes.

Was, wenn die API kein Pagination-Pattern dokumentiert? Response-Header und Body auf link-Header (RFC 5988), next-Felder oder Cursor-Indikatoren prüfen. Mit kleinem limit testen und beobachten.

Nächste Schritte

Solide API-Ingestion ist die erste Stufe. Sobald Daten verlässlich landen, sind die nächsten Probleme Transformation und Queryability für den Rest des Teams.

Weiterlesen

Stand: 14. Mai 2026.

H

Geschrieben von

Harbinger Team

Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastruktur­kritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.

Hat dir das geholfen?

Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.

Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.