Inhaltsverzeichnis15 Abschnitte
- TL;DR
- Architektur-Überblick
- Schritt 1: HTTP-Session aufsetzen
- Schritt 2: Retry-Logic mit exponentiellem Backoff
- Schritt 3: Pagination handhaben
- Schritt 4: Responses validieren und flatten
- Schritt 5: Volle Pipeline zusammenbauen
- Häufige Stolperfallen
- Rate Limits
- Authentifizierung
- Schema-Drift
- Pagination-Edge-Cases
- Wann über manuelles Ingestieren hinausgehen
- FAQ
- Nächste Schritte
REST API Data Pipeline in Python bauen
Du bist gleich dabei, das erste Mal Daten aus einer REST-API zu ziehen, und es läuft — bis es nicht mehr läuft. Rate Limits greifen, das Next-Page-Token ist nicht da, wo die Docs es sagten, und plötzlich ist ein Feld, das immer ein String war, manchmal null. Eine robuste REST-API-Data-Pipeline zu bauen heißt, all das vorne zu planen — nicht um 2 Uhr nachts im On-Call zu entdecken. Dieser Guide führt durch die volle Pipeline: vom ersten HTTP-Request zu strukturiertem, schema-validiertem Output.
TL;DR
- Session statt bare
requests.get, Retry mit exponential Backoff und Jitter - Pagination als Generator (Cursor, Offset oder Page-Number)
- Schema-Validierung beim Ingestieren, nicht downstream
- Always-on Logging, Skip-Rate als Schema-Drift-Indikator
Architektur-Überblick
① Auth-Manager handhabt Token-Lifecycle — Refresh-Tokens vor Ablauf, nicht nur beim Startup. Eine Zwei-Stunden-Pipeline, die nach 1h einen 401 kriegt, verschwendet den ganzen Run. ② Rate-Limit-Handler umhüllt jeden Request mit Retry-Logic und exponentiellem Backoff. Nicht optional für Production. ③ Pagination-Loop fetched Seiten, bis kein Next-Cursor mehr da ist — kenne das Pagination-Pattern deiner API, bevor du eine Zeile Code schreibst. ④ Schema-Validator fängt Schema-Drift früh — fail loud und spezifisch, statt downstream Tables still zu korrumpieren.
Schritt 1: HTTP-Session aufsetzen
Nutze keine bare requests.get()-Calls für Production. Ein Session-Objekt nutzen — es wiederverwendet TCP-Verbindungen, lässt dich Default-Headers einmal setzen und zentralisiert Auth-Management.
import requests
import time
import logging
from typing import Optional, Generator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def build_session(api_key: str) -> requests.Session:
"""Wiederverwendbare Session mit Auth-Headern und sinnvollen Defaults."""
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"Content-Type": "application/json",
})
return session
Immer timeout auf jedem Request setzen — nie auf Default lassen (ist unendlich). Ein hängender API-Call blockiert deine ganze Pipeline.
Schritt 2: Retry-Logic mit exponentiellem Backoff
Rate Limits und transiente Fehler passieren bei Scale garantiert. Bau Retry-Logic vor du sie brauchst.
import random
from requests.exceptions import ConnectionError, Timeout
def fetch_with_retry(
session: requests.Session,
url: str,
params: Optional[dict] = None,
max_retries: int = 5,
base_delay: float = 1.0,
) -> dict:
"""
URL mit exponential Backoff + Jitter fetchen.
Respektiert Retry-After-Header bei 429-Responses.
Wirft RuntimeError nach max_retries fehlgeschlagenen Versuchen.
"""
for attempt in range(max_retries):
try:
response = session.get(url, params=params, timeout=30)
# Explizites 429-Handling: Retry-After respektieren
if response.status_code == 429:
retry_after = int(
response.headers.get("Retry-After", base_delay * (2 ** attempt))
)
logger.warning(
f"Rate limited. Warte {retry_after}s "
f"(Versuch {attempt + 1}/{max_retries})"
)
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except (ConnectionError, Timeout) as e:
if attempt == max_retries - 1:
raise
# Jitter verhindert Thundering Herd bei parallelen Retries
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Request fehlgeschlagen ({e}). Retry in {delay:.1f}s...")
time.sleep(delay)
raise RuntimeError(f"Konnte {url} nach {max_retries} Versuchen nicht fetchen")
Der Jitter (random.uniform(0, 1)) ist besonders wichtig, wenn mehrere Worker parallel retryen — ohne ihn retryen alle gleichzeitig und compounden die Last.
Schritt 3: Pagination handhaben
APIs nutzen verschiedene Pagination-Patterns. Die drei häufigsten: offset/limit, cursor-based und page-number. Generator für cursor-based — am häufigsten in modernen REST-APIs.
def paginate_cursor(
session: requests.Session,
base_url: str,
params: Optional[dict] = None,
cursor_key: str = "next_cursor",
data_key: str = "data",
page_size: int = 100,
) -> Generator[list, None, None]:
"""
Cursor-based API-Endpoint paginieren.
Yieldet eine Seite Records pro Mal.
Args:
cursor_key: JSON-Key für den Next-Page-Cursor in der Response
data_key: JSON-Key mit der Liste Records pro Seite
page_size: Records pro Request (an API-Maximum anpassen)
"""
params = {**(params or {}), "limit": page_size}
while True:
response = fetch_with_retry(session, base_url, params=params)
records = response.get(data_key, [])
if not records:
logger.info("Keine Records zurück — Pagination fertig.")
break
yield records
logger.info(f"Seite mit {len(records)} Records geladen")
next_cursor = response.get(cursor_key)
if not next_cursor:
break # Keine weiteren Seiten
params["cursor"] = next_cursor
# URL bleibt; nur Cursor-Param ändert sich
Wenn deine API offset/limit nutzt: offset += len(records) tracken und stoppen, wenn len(records) < page_size. Bei page-numbers: page inkrementieren und stoppen, wenn die Response die letzte Seite anzeigt.
Schritt 4: Responses validieren und flatten
APIs ändern sich ohne Vorwarnung. Felder werden hinzugefügt, entfernt oder ändern still den Typ. Schema-Drift beim Ingestieren zu fangen, ist viel billiger als es in einer Downstream-Transformation zu debuggen.
from dataclasses import dataclass, field
from typing import List
@dataclass
class ApiRecord:
"""
Validierte und geflättete Record-Struktur.
Felder an dein API-Schema anpassen.
"""
id: str
name: str
created_at: str
value: Optional[float] = None
tags: List[str] = field(default_factory=list)
def validate_record(raw: dict) -> Optional[ApiRecord]:
"""
Raw-API-Record gegen erwartetes Schema validieren.
Gibt None (mit Warning) für invalide Records zurück,
statt den ganzen Run zu crashen.
"""
required_fields = {"id", "name", "created_at"}
missing = required_fields - raw.keys()
if missing:
logger.warning(
f"Record {raw.get('id', '[no id]')} fehlt Felder: {missing}"
)
return None
return ApiRecord(
id=str(raw["id"]),
name=str(raw["name"]),
created_at=str(raw["created_at"]),
value=float(raw["value"]) if raw.get("value") is not None else None,
tags=raw.get("tags", []),
)
Skip-Rate tracken: wenn mehr als ein kleiner Prozentsatz der Records die Validierung nicht schafft, hat sich was am API-Schema geändert und du willst es wissen, bevor schlechte Daten propagieren.
Schritt 5: Volle Pipeline zusammenbauen
import json
def run_pipeline(api_key: str, base_url: str, output_path: str) -> None:
"""Volle Ingestion-Pipeline: authentifizieren → paginieren → validieren → schreiben."""
session = build_session(api_key)
valid_records = []
skipped = 0
for page in paginate_cursor(session, base_url):
for raw in page:
record = validate_record(raw)
if record:
valid_records.append(record.__dict__)
else:
skipped += 1
skip_rate = skipped / max(len(valid_records) + skipped, 1)
logger.info(
f"Pipeline fertig: {len(valid_records)} valide, "
f"{skipped} übersprungen ({skip_rate:.1%})"
)
if skip_rate > 0.05: # Alert bei >5% Validation-Fail
logger.error("Skip-Rate über 5% — Schema-Changes upstream prüfen")
with open(output_path, "w") as f:
json.dump(valid_records, f, indent=2, default=str)
if __name__ == "__main__":
run_pipeline(
api_key="your-api-key",
base_url="https://api.example.com/v1/records",
output_path="output/records.json",
)
Häufige Stolperfallen
Rate Limits
Der naive Ansatz wartet auf 429, bevor er langsamer wird. Besser: X-RateLimit-Remaining-Header proaktiv lesen und throttlen, bevor du Null triffst. Manche APIs haben gestaffelte Limits — pro Sekunde UND pro Tag — und unterschiedliche Fehlermeldungen pro Limit. Docs genau lesen; viele Rate-Limit-Bugs zeigen sich nicht in Development, weil Testvolumen niedrig sind.
Authentifizierung
OAuth-Tokens laufen ab. Wenn deine Pipeline 2+ Stunden läuft und mittendrin einen 401 kriegt, ist der ganze Run weg. Token-Refresh-Check vor jedem Request-Batch — nicht nur beim Startup. Credentials in Env-Vars oder einem Secrets-Manager speichern, nie im Code oder committeten Configs.
Schema-Drift
API-Anbieter fügen Felder ohne Warnung hinzu, ändern Typen still oder nesten Daten zwischen Versionen anders. Dein Validator sollte unerwartete Shapes loggen und überspringen, statt zu crashen — aber laut alerten. Wenn 10% deiner Records plötzlich invalide sind, ist das ein Breaking Change upstream, kein Datenqualitäts-Quirk.
Pagination-Edge-Cases
Zwei häufige Gotchas: manche APIs liefern eine leere Seite mit validem Next-Cursor auf der letzten echten Seite — prüfe beide: records == [] UND next_cursor is None. Und trau total_count nicht für Loop-Termination; manche APIs liefern Schätzwerte. Paginiere bis natürlich erschöpft, statt zum Ziel zu zählen.
Wann über manuelles Ingestieren hinausgehen
Für Einmal-Skripte oder eine Handvoll Quellen reicht dieser Code. Aber bei 20+ APIs mit Credential-Rotation, Schema-Versionierung und Failure-Monitoring werden manuelle Skripte erhebliche Pflege-Last. Tools, die Crawl-and-Catalog automatisieren, zahlen sich bei dem Scale aus.
Harbinger Explorer enthält ein API-Crawling-Feature mit geführtem Setup-Wizard: zeig auf einen Endpoint, konfiguriere Auth, und es handhabt Pagination, Scheduling und Schema-Tracking automatisch. Nützlich, wenn das Ziel Analyse ist, nicht Pipeline-Pflege.
FAQ
Wie unterscheidet sich httpx von requests? httpx ist API-kompatibler Nachfolger mit async Support und HTTP/2. Für Production empfohlen, bei einfachen Skripten reicht requests.
Brauche ich asyncio für API-Pipelines? Bei vielen unabhängigen Quellen lohnt es sich (paralleles Fetchen). Bei einer sequenziellen Pagination eher nicht.
Wie testet man eine Pipeline lokal? Responses mocken (pytest + respx oder responses-Lib). Sample-Payloads als Fixtures speichern. Tests laufen ohne echte API-Calls.
Wie monitort man Production-Pipelines? Strukturiertes Logging (JSON) in einen Log-Aggregator (Datadog, Grafana Loki). Metriken zu Records, Skip-Rate, Duration tracken. Alerts bei Skip-Rate > 5% oder Duration-Spikes.
Was, wenn die API kein Pagination-Pattern dokumentiert?
Response-Header und Body auf link-Header (RFC 5988), next-Felder oder Cursor-Indikatoren prüfen. Mit kleinem limit testen und beobachten.
Nächste Schritte
Solide API-Ingestion ist die erste Stufe. Sobald Daten verlässlich landen, sind die nächsten Probleme Transformation und Queryability für den Rest des Teams.
Weiterlesen
- dbt vs Spark SQL: Wahl der Transformations-Schicht
- Self-Service Analytics: Warum die meisten Teams es falsch machen
- AI Agents vs BI Dashboards: Was sich wirklich ändert
Stand: 14. Mai 2026.
Geschrieben von
Harbinger Team
Cloud-, Data- und AI-Engineer in DACH. Schreibt seit 2018 über infrastrukturkritische Tech-Entscheidungen — keine Marketing- Folien, sondern echte Trade-offs aus Production-Workloads.
Hat dir das geholfen?
Jede Woche ein neuer Artikel über DACH-Cloud, Data und AI — direkt in dein Postfach. Kein Spam, kein Marketing-Sprech.
Kein Spam. 1-Klick-Abmeldung. Datenschutz bei Loops.so.