Harbinger Explorer

Back to Knowledge Hub
Tutorials
Published:

Python for Data Engineering: The Practical Toolkit

10 min read·Tags: python, data engineering, pandas, polars, pydantic, etl, sqlalchemy, tutorial

Python for Data Engineering: The Practical Toolkit You Actually Need

You know Python. You've written scripts, maybe some pandas code, possibly a Flask app. But data engineering Python is a different animal — it's less about clever algorithms and more about moving data reliably at scale, handling failures gracefully, and writing code that your future self (or your teammates) won't curse at 3 AM.

This guide covers the Python toolkit, patterns, and practices that separate a data engineer's Python from a data scientist's Python. No theory dumps — just the tools and techniques you'll use daily.

Why Python Dominates Data Engineering

Python isn't the fastest language. It's not the most type-safe. But it wins on three fronts that matter enormously in data engineering:

StrengthWhy It Matters
Ecosystem depthLibraries for every data source, format, and cloud service
Orchestrator supportAirflow, Dagster, Prefect — all Python-native
Spark integrationPySpark is the most popular Spark API by usage
Cloud SDK coverageAWS boto3, Azure SDK, GCP client libraries — all first-class Python
Hiring poolMost data professionals already know Python

The real advantage isn't any single feature — it's that Python is the lingua franca connecting every layer of the modern data stack.

Essential Libraries Every Data Engineer Needs

Forget the "top 50 Python libraries" listicles. Here's what you'll actually import in production data pipelines:

Data Processing

LibraryUse CaseWhen to Choose
pandasSmall-to-medium transforms (<1GB)Ad hoc analysis, prototyping
polarsMedium datasets, performance-criticalWhen pandas is too slow but Spark is overkill
PySparkLarge-scale distributed processingDatasets >10GB, existing Spark infrastructure
DuckDBAnalytical SQL on local filesParquet/CSV analysis, quick aggregations
pyarrowColumnar data, Parquet I/OReading/writing Parquet, Arrow-based interop

Connectivity and I/O

LibraryPurpose
sqlalchemyDatabase connections, ORM, raw SQL execution
requests / httpxREST API ingestion
boto3AWS S3, Glue, Redshift, everything AWS
azure-storage-blobAzure Blob Storage I/O
psycopg2 / asyncpgPostgreSQL drivers
confluent-kafkaKafka producer/consumer

Pipeline and Quality

LibraryPurpose
great_expectationsData quality validation
pydanticSchema validation for configs and API payloads
dbt-coreSQL-based transformations (Python models too)
delta-rs (deltalake)Delta Lake without Spark

Setting Up a Data Engineering Python Project

Skip requirements.txt for anything beyond a throwaway script. Here's the modern setup:

Project Structure

my-pipeline/
+-- pyproject.toml          # Dependencies, build config, tool settings
+-- src/
|   +-- my_pipeline/
|       +-- __init__.py
|       +-- extract.py      # Source connectors
|       +-- transform.py    # Business logic
|       +-- load.py         # Sink writers
|       +-- models.py       # Pydantic schemas
+-- tests/
|   +-- test_extract.py
|   +-- test_transform.py
+-- .env.example
+-- README.md

Dependency Management with uv

uv has largely replaced pip and poetry in 2025-2026 for good reason — it's fast and handles resolution correctly:

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Init a new project
uv init my-pipeline
cd my-pipeline

# Add dependencies
uv add pandas sqlalchemy psycopg2-binary pydantic
uv add --dev pytest ruff mypy

# Run scripts in the managed environment
uv run python src/my_pipeline/extract.py

# Lock and sync
uv lock
uv sync

The pyproject.toml replaces requirements.txt, setup.py, and setup.cfg in one file:

[project]
name = "my-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "pandas>=2.2",
    "sqlalchemy>=2.0",
    "psycopg2-binary>=2.9",
    "pydantic>=2.0",
]

[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "UP"]

Pattern 1: Type-Safe Pipeline Configs with Pydantic

Hard-coded connection strings and magic numbers kill pipelines in production. Use Pydantic to validate configuration at startup — fail fast, fail loud:

# models.py
from pydantic import BaseModel, Field, SecretStr
from pydantic_settings import BaseSettings

class DatabaseConfig(BaseSettings):
    host: str = Field(..., alias="DB_HOST")
    port: int = Field(5432, alias="DB_PORT")
    database: str = Field(..., alias="DB_NAME")
    user: str = Field(..., alias="DB_USER")
    password: SecretStr = Field(..., alias="DB_PASSWORD")

    @property
    def connection_string(self) -> str:
        return (
            f"postgresql://{self.user}:"
            f"{self.password.get_secret_value()}"
            f"@{self.host}:{self.port}/{self.database}"
        )

    model_config = {"env_file": ".env"}


class PipelineConfig(BaseSettings):
    db: DatabaseConfig = DatabaseConfig()
    batch_size: int = Field(10_000, alias="BATCH_SIZE")
    max_retries: int = Field(3, alias="MAX_RETRIES")
    source_api_url: str = Field(..., alias="SOURCE_API_URL")

Usage — the pipeline crashes immediately if required env vars are missing:

# extract.py
from my_pipeline.models import PipelineConfig

config = PipelineConfig()
# If DB_HOST or SOURCE_API_URL are not set,
# Pydantic raises a clear ValidationError at startup

This is better than discovering a KeyError halfway through a 2-hour batch job.

Pattern 2: Robust API Ingestion with Retry and Backoff

Every data engineer writes API extractors. Most write them badly the first time. Here's a production-grade pattern using httpx with structured retries:

# extract.py
import httpx
import time
import logging
from typing import Generator

logger = logging.getLogger(__name__)

def fetch_paginated(
    base_url: str,
    headers: dict,
    page_size: int = 100,
    max_retries: int = 3,
) -> Generator[list[dict], None, None]:
    offset = 0

    with httpx.Client(timeout=30.0) as client:
        while True:
            params = {"limit": page_size, "offset": offset}

            for attempt in range(1, max_retries + 1):
                try:
                    resp = client.get(
                        base_url,
                        headers=headers,
                        params=params,
                    )
                    resp.raise_for_status()
                    break
                except httpx.HTTPStatusError as e:
                    if e.response.status_code == 429:
                        wait = int(
                            e.response.headers.get("Retry-After", 2 ** attempt)
                        )
                        logger.warning(f"Rate limited. Waiting {wait}s.")
                        time.sleep(wait)
                    elif attempt == max_retries:
                        raise
                    else:
                        time.sleep(2 ** attempt)
                except httpx.RequestError:
                    if attempt == max_retries:
                        raise
                    time.sleep(2 ** attempt)

            data = resp.json()
            records = data.get("results", [])

            if not records:
                break

            yield records
            offset += page_size

            logger.info(f"Fetched {offset} records so far.")

Key points: generator-based (constant memory), respects Retry-After headers, exponential backoff on transient failures.

Pattern 3: Efficient Batch Loading with SQLAlchemy

Loading data row-by-row is the number one performance mistake in Python ETL. Always batch:

# load.py -- PostgreSQL dialect
from sqlalchemy import create_engine, text
from typing import Iterator
import logging

logger = logging.getLogger(__name__)

def batch_upsert(
    connection_string: str,
    table: str,
    records: Iterator[list[dict]],
    conflict_key: str = "id",
    batch_size: int = 5_000,
) -> int:
    engine = create_engine(connection_string)
    total = 0

    with engine.begin() as conn:
        for batch in records:
            if not batch:
                continue

            columns = list(batch[0].keys())
            placeholders = ", ".join(f":{col}" for col in columns)
            update_clause = ", ".join(
                f"{col} = EXCLUDED.{col}"
                for col in columns
                if col != conflict_key
            )

            # PostgreSQL INSERT ... ON CONFLICT (upsert)
            stmt = text(
                f"INSERT INTO {table} ({', '.join(columns)}) "
                f"VALUES ({placeholders}) "
                f"ON CONFLICT ({conflict_key}) "
                f"DO UPDATE SET {update_clause}"
            )

            conn.execute(stmt, batch)
            total += len(batch)
            logger.info(f"Upserted {total} records into {table}.")

    return total

This pattern processes data in constant memory by consuming the generator from the extract step directly.

Pattern 4: pandas vs polars — When to Switch

Most data engineers start with pandas. That's fine. But knowing when polars is the better tool saves hours of waiting:

# Pandas -- familiar, great for quick exploration
# Good for: <500MB, mixed operations, wide ecosystem
import pandas as pd

df = pd.read_parquet("events.parquet")
result = (
    df.groupby("event_type")
    .agg(count=("event_id", "count"), avg_value=("value", "mean"))
    .reset_index()
    .sort_values("count", ascending=False)
)
# Polars -- same logic, 5-10x faster on medium datasets
# Good for: 500MB-10GB, aggregation-heavy, lazy evaluation
import polars as pl

result = (
    pl.scan_parquet("events.parquet")  # lazy -- no data loaded yet
    .group_by("event_type")
    .agg(
        pl.col("event_id").count().alias("count"),
        pl.col("value").mean().alias("avg_value"),
    )
    .sort("count", descending=True)
    .collect()  # execution happens here
)
Dimensionpandaspolars
Memory modelEager — loads everything into RAMLazy evaluation with scan_*
Speed (1GB aggregation)~12s typical~2s typical
MutabilityMutable DataFramesImmutable by default
API styleMethod chaining + indexingExpression-based, composable
EcosystemMassive (scikit-learn, plotly, etc.)Growing, good Arrow interop
Learning curveLower (more tutorials/SO answers)Moderate (different mental model)

My take: start new pipelines in polars if your team is open to it. The lazy evaluation alone prevents an entire class of memory bugs. Fall back to pandas when a specific library requires a pandas DataFrame as input.

Pattern 5: Writing Testable Pipeline Code

Data pipelines that can't be tested locally are pipelines that break in production. Separate I/O from logic:

# transform.py -- pure functions, no I/O
import polars as pl

def clean_events(df: pl.DataFrame) -> pl.DataFrame:
    return (
        df.filter(pl.col("event_id").is_not_null())
        .unique(subset=["event_id"])
        .with_columns(
            pl.col("timestamp").str.to_datetime().alias("event_ts"),
            pl.col("value").fill_null(0).alias("value_clean"),
        )
    )

def flag_anomalies(
    df: pl.DataFrame,
    threshold: float = 3.0,
) -> pl.DataFrame:
    stats = df.select(
        pl.col("value_clean").mean().alias("mu"),
        pl.col("value_clean").std().alias("sigma"),
    ).row(0)
    mu, sigma = stats

    return df.with_columns(
        (
            ((pl.col("value_clean") - mu).abs() > threshold * sigma)
        ).alias("is_anomaly")
    )
# tests/test_transform.py
import polars as pl
from my_pipeline.transform import clean_events, flag_anomalies

def test_clean_events_removes_nulls():
    df = pl.DataFrame({
        "event_id": ["a", None, "b", "a"],
        "timestamp": ["2026-01-01", "2026-01-02", "2026-01-03", "2026-01-01"],
        "value": [10.0, 20.0, None, 10.0],
    })
    result = clean_events(df)

    assert result.shape[0] == 2  # null removed, duplicate removed
    assert result.filter(pl.col("event_id").is_null()).shape[0] == 0

def test_flag_anomalies_catches_outliers():
    df = pl.DataFrame({
        "value_clean": [10.0, 11.0, 9.0, 10.5, 100.0],
    })
    result = flag_anomalies(df, threshold=2.0)

    anomalies = result.filter(pl.col("is_anomaly"))
    assert anomalies.shape[0] == 1
    assert anomalies["value_clean"][0] == 100.0

Run tests with: uv run pytest tests/ -v

Common Mistakes and How to Avoid Them

These are the pitfalls I see most often in data engineering Python code:

1. Loading Entire Datasets Into Memory

Wrong:

# This loads 50GB into RAM and crashes
df = pd.read_parquet("s3://bucket/huge-table/")

Right:

# Process partition by partition
import pyarrow.parquet as pq

dataset = pq.ParquetDataset("s3://bucket/huge-table/")
for batch in dataset.to_batches(batch_size=100_000):
    chunk = pl.from_arrow(batch)
    process(chunk)

2. String Concatenation for SQL

Wrong (SQL injection risk):

query = f"SELECT * FROM users WHERE name = '{user_input}'"

Right:

# SQLAlchemy -- PostgreSQL dialect
from sqlalchemy import text
query = text("SELECT * FROM users WHERE name = :name")
result = conn.execute(query, {"name": user_input})

3. Ignoring Logging

Wrong:

print(f"Processed {count} records")  # Lost in container logs

Right:

import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Processed %d records", count)

4. No Type Hints

Data engineering code is often maintained by multiple people. Type hints are documentation that the IDE enforces:

# Without types -- what does this return? What's 'config'?
def extract(config, start_date):
    ...

# With types -- clear contract
def extract(
    config: PipelineConfig,
    start_date: datetime,
) -> Generator[list[dict], None, None]:
    ...

5. Synchronous Where Async Helps

When ingesting from multiple independent APIs, asyncio + httpx can cut wall-clock time dramatically:

import asyncio
import httpx

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=30.0) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return [
            r.json() for r in responses
            if isinstance(r, httpx.Response) and r.status_code == 200
        ]

# Usage
results = asyncio.run(fetch_all(api_endpoints))

Python Performance Tips for Data Pipelines

TechniqueImpactWhen to Use
Use generators instead of listsConstant memoryAnything paginated or batched
polars lazy mode5-10x faster than eager pandasAggregations on files >100MB
PyArrow for I/O3-5x faster Parquet readsAny Parquet read/write
orjson instead of json3-10x faster JSON parsingHigh-volume API ingestion
Connection poolingEliminates connection overheadAny database-heavy pipeline
multiprocessing.PoolUtilize all CPU coresCPU-bound transforms (not I/O)

Putting It Together: A Mini Pipeline

Here's how the patterns connect in a real extract-transform-load flow:

# main.py -- orchestration entrypoint
import logging
from my_pipeline.models import PipelineConfig
from my_pipeline.extract import fetch_paginated
from my_pipeline.transform import clean_events, flag_anomalies
from my_pipeline.load import batch_upsert
import polars as pl

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run():
    config = PipelineConfig()  # validates env vars or crashes

    # Extract: paginated API fetch (generator)
    raw_pages = fetch_paginated(
        base_url=config.source_api_url,
        headers={"Authorization": f"Bearer {config.api_token}"},
        page_size=config.batch_size,
    )

    # Transform: process each page, yield cleaned batches
    def transform_stream(pages):
        for page in pages:
            df = pl.DataFrame(page)
            df = clean_events(df)
            df = flag_anomalies(df)
            yield df.to_dicts()

    # Load: upsert into PostgreSQL
    total = batch_upsert(
        connection_string=config.db.connection_string,
        table="events",
        records=transform_stream(raw_pages),
        conflict_key="event_id",
    )

    logger.info(f"Pipeline complete. {total} records upserted.")

if __name__ == "__main__":
    run()

The entire pipeline runs in constant memory regardless of source size, validates config before touching any data, and each component is independently testable.

Continue Reading

If you want to prototype pipeline logic quickly — run SQL against CSVs and Parquet files, test API responses interactively — Harbinger Explorer lets you do that directly in the browser with DuckDB WASM. Useful for validating transforms before committing them to your production pipeline code.

The best Python data engineering code isn't clever. It's boring, predictable, well-typed, and handles failures before they become incidents. Write code that lets you sleep through the night.


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...