Python for Data Engineering: The Practical Toolkit
Python for Data Engineering: The Practical Toolkit You Actually Need
You know Python. You've written scripts, maybe some pandas code, possibly a Flask app. But data engineering Python is a different animal — it's less about clever algorithms and more about moving data reliably at scale, handling failures gracefully, and writing code that your future self (or your teammates) won't curse at 3 AM.
This guide covers the Python toolkit, patterns, and practices that separate a data engineer's Python from a data scientist's Python. No theory dumps — just the tools and techniques you'll use daily.
Why Python Dominates Data Engineering
Python isn't the fastest language. It's not the most type-safe. But it wins on three fronts that matter enormously in data engineering:
| Strength | Why It Matters |
|---|---|
| Ecosystem depth | Libraries for every data source, format, and cloud service |
| Orchestrator support | Airflow, Dagster, Prefect — all Python-native |
| Spark integration | PySpark is the most popular Spark API by usage |
| Cloud SDK coverage | AWS boto3, Azure SDK, GCP client libraries — all first-class Python |
| Hiring pool | Most data professionals already know Python |
The real advantage isn't any single feature — it's that Python is the lingua franca connecting every layer of the modern data stack.
Essential Libraries Every Data Engineer Needs
Forget the "top 50 Python libraries" listicles. Here's what you'll actually import in production data pipelines:
Data Processing
| Library | Use Case | When to Choose |
|---|---|---|
| pandas | Small-to-medium transforms (<1GB) | Ad hoc analysis, prototyping |
| polars | Medium datasets, performance-critical | When pandas is too slow but Spark is overkill |
| PySpark | Large-scale distributed processing | Datasets >10GB, existing Spark infrastructure |
| DuckDB | Analytical SQL on local files | Parquet/CSV analysis, quick aggregations |
| pyarrow | Columnar data, Parquet I/O | Reading/writing Parquet, Arrow-based interop |
Connectivity and I/O
| Library | Purpose |
|---|---|
| sqlalchemy | Database connections, ORM, raw SQL execution |
| requests / httpx | REST API ingestion |
| boto3 | AWS S3, Glue, Redshift, everything AWS |
| azure-storage-blob | Azure Blob Storage I/O |
| psycopg2 / asyncpg | PostgreSQL drivers |
| confluent-kafka | Kafka producer/consumer |
Pipeline and Quality
| Library | Purpose |
|---|---|
| great_expectations | Data quality validation |
| pydantic | Schema validation for configs and API payloads |
| dbt-core | SQL-based transformations (Python models too) |
| delta-rs (deltalake) | Delta Lake without Spark |
Setting Up a Data Engineering Python Project
Skip requirements.txt for anything beyond a throwaway script. Here's the modern setup:
Project Structure
my-pipeline/
+-- pyproject.toml # Dependencies, build config, tool settings
+-- src/
| +-- my_pipeline/
| +-- __init__.py
| +-- extract.py # Source connectors
| +-- transform.py # Business logic
| +-- load.py # Sink writers
| +-- models.py # Pydantic schemas
+-- tests/
| +-- test_extract.py
| +-- test_transform.py
+-- .env.example
+-- README.md
Dependency Management with uv
uv has largely replaced pip and poetry in 2025-2026 for good reason — it's fast and handles resolution correctly:
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Init a new project
uv init my-pipeline
cd my-pipeline
# Add dependencies
uv add pandas sqlalchemy psycopg2-binary pydantic
uv add --dev pytest ruff mypy
# Run scripts in the managed environment
uv run python src/my_pipeline/extract.py
# Lock and sync
uv lock
uv sync
The pyproject.toml replaces requirements.txt, setup.py, and setup.cfg in one file:
[project]
name = "my-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"pandas>=2.2",
"sqlalchemy>=2.0",
"psycopg2-binary>=2.9",
"pydantic>=2.0",
]
[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "UP"]
Pattern 1: Type-Safe Pipeline Configs with Pydantic
Hard-coded connection strings and magic numbers kill pipelines in production. Use Pydantic to validate configuration at startup — fail fast, fail loud:
# models.py
from pydantic import BaseModel, Field, SecretStr
from pydantic_settings import BaseSettings
class DatabaseConfig(BaseSettings):
host: str = Field(..., alias="DB_HOST")
port: int = Field(5432, alias="DB_PORT")
database: str = Field(..., alias="DB_NAME")
user: str = Field(..., alias="DB_USER")
password: SecretStr = Field(..., alias="DB_PASSWORD")
@property
def connection_string(self) -> str:
return (
f"postgresql://{self.user}:"
f"{self.password.get_secret_value()}"
f"@{self.host}:{self.port}/{self.database}"
)
model_config = {"env_file": ".env"}
class PipelineConfig(BaseSettings):
db: DatabaseConfig = DatabaseConfig()
batch_size: int = Field(10_000, alias="BATCH_SIZE")
max_retries: int = Field(3, alias="MAX_RETRIES")
source_api_url: str = Field(..., alias="SOURCE_API_URL")
Usage — the pipeline crashes immediately if required env vars are missing:
# extract.py
from my_pipeline.models import PipelineConfig
config = PipelineConfig()
# If DB_HOST or SOURCE_API_URL are not set,
# Pydantic raises a clear ValidationError at startup
This is better than discovering a KeyError halfway through a 2-hour batch job.
Pattern 2: Robust API Ingestion with Retry and Backoff
Every data engineer writes API extractors. Most write them badly the first time. Here's a production-grade pattern using httpx with structured retries:
# extract.py
import httpx
import time
import logging
from typing import Generator
logger = logging.getLogger(__name__)
def fetch_paginated(
base_url: str,
headers: dict,
page_size: int = 100,
max_retries: int = 3,
) -> Generator[list[dict], None, None]:
offset = 0
with httpx.Client(timeout=30.0) as client:
while True:
params = {"limit": page_size, "offset": offset}
for attempt in range(1, max_retries + 1):
try:
resp = client.get(
base_url,
headers=headers,
params=params,
)
resp.raise_for_status()
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
wait = int(
e.response.headers.get("Retry-After", 2 ** attempt)
)
logger.warning(f"Rate limited. Waiting {wait}s.")
time.sleep(wait)
elif attempt == max_retries:
raise
else:
time.sleep(2 ** attempt)
except httpx.RequestError:
if attempt == max_retries:
raise
time.sleep(2 ** attempt)
data = resp.json()
records = data.get("results", [])
if not records:
break
yield records
offset += page_size
logger.info(f"Fetched {offset} records so far.")
Key points: generator-based (constant memory), respects Retry-After headers, exponential backoff on transient failures.
Pattern 3: Efficient Batch Loading with SQLAlchemy
Loading data row-by-row is the number one performance mistake in Python ETL. Always batch:
# load.py -- PostgreSQL dialect
from sqlalchemy import create_engine, text
from typing import Iterator
import logging
logger = logging.getLogger(__name__)
def batch_upsert(
connection_string: str,
table: str,
records: Iterator[list[dict]],
conflict_key: str = "id",
batch_size: int = 5_000,
) -> int:
engine = create_engine(connection_string)
total = 0
with engine.begin() as conn:
for batch in records:
if not batch:
continue
columns = list(batch[0].keys())
placeholders = ", ".join(f":{col}" for col in columns)
update_clause = ", ".join(
f"{col} = EXCLUDED.{col}"
for col in columns
if col != conflict_key
)
# PostgreSQL INSERT ... ON CONFLICT (upsert)
stmt = text(
f"INSERT INTO {table} ({', '.join(columns)}) "
f"VALUES ({placeholders}) "
f"ON CONFLICT ({conflict_key}) "
f"DO UPDATE SET {update_clause}"
)
conn.execute(stmt, batch)
total += len(batch)
logger.info(f"Upserted {total} records into {table}.")
return total
This pattern processes data in constant memory by consuming the generator from the extract step directly.
Pattern 4: pandas vs polars — When to Switch
Most data engineers start with pandas. That's fine. But knowing when polars is the better tool saves hours of waiting:
# Pandas -- familiar, great for quick exploration
# Good for: <500MB, mixed operations, wide ecosystem
import pandas as pd
df = pd.read_parquet("events.parquet")
result = (
df.groupby("event_type")
.agg(count=("event_id", "count"), avg_value=("value", "mean"))
.reset_index()
.sort_values("count", ascending=False)
)
# Polars -- same logic, 5-10x faster on medium datasets
# Good for: 500MB-10GB, aggregation-heavy, lazy evaluation
import polars as pl
result = (
pl.scan_parquet("events.parquet") # lazy -- no data loaded yet
.group_by("event_type")
.agg(
pl.col("event_id").count().alias("count"),
pl.col("value").mean().alias("avg_value"),
)
.sort("count", descending=True)
.collect() # execution happens here
)
| Dimension | pandas | polars |
|---|---|---|
| Memory model | Eager — loads everything into RAM | Lazy evaluation with scan_* |
| Speed (1GB aggregation) | ~12s typical | ~2s typical |
| Mutability | Mutable DataFrames | Immutable by default |
| API style | Method chaining + indexing | Expression-based, composable |
| Ecosystem | Massive (scikit-learn, plotly, etc.) | Growing, good Arrow interop |
| Learning curve | Lower (more tutorials/SO answers) | Moderate (different mental model) |
My take: start new pipelines in polars if your team is open to it. The lazy evaluation alone prevents an entire class of memory bugs. Fall back to pandas when a specific library requires a pandas DataFrame as input.
Pattern 5: Writing Testable Pipeline Code
Data pipelines that can't be tested locally are pipelines that break in production. Separate I/O from logic:
# transform.py -- pure functions, no I/O
import polars as pl
def clean_events(df: pl.DataFrame) -> pl.DataFrame:
return (
df.filter(pl.col("event_id").is_not_null())
.unique(subset=["event_id"])
.with_columns(
pl.col("timestamp").str.to_datetime().alias("event_ts"),
pl.col("value").fill_null(0).alias("value_clean"),
)
)
def flag_anomalies(
df: pl.DataFrame,
threshold: float = 3.0,
) -> pl.DataFrame:
stats = df.select(
pl.col("value_clean").mean().alias("mu"),
pl.col("value_clean").std().alias("sigma"),
).row(0)
mu, sigma = stats
return df.with_columns(
(
((pl.col("value_clean") - mu).abs() > threshold * sigma)
).alias("is_anomaly")
)
# tests/test_transform.py
import polars as pl
from my_pipeline.transform import clean_events, flag_anomalies
def test_clean_events_removes_nulls():
df = pl.DataFrame({
"event_id": ["a", None, "b", "a"],
"timestamp": ["2026-01-01", "2026-01-02", "2026-01-03", "2026-01-01"],
"value": [10.0, 20.0, None, 10.0],
})
result = clean_events(df)
assert result.shape[0] == 2 # null removed, duplicate removed
assert result.filter(pl.col("event_id").is_null()).shape[0] == 0
def test_flag_anomalies_catches_outliers():
df = pl.DataFrame({
"value_clean": [10.0, 11.0, 9.0, 10.5, 100.0],
})
result = flag_anomalies(df, threshold=2.0)
anomalies = result.filter(pl.col("is_anomaly"))
assert anomalies.shape[0] == 1
assert anomalies["value_clean"][0] == 100.0
Run tests with: uv run pytest tests/ -v
Common Mistakes and How to Avoid Them
These are the pitfalls I see most often in data engineering Python code:
1. Loading Entire Datasets Into Memory
Wrong:
# This loads 50GB into RAM and crashes
df = pd.read_parquet("s3://bucket/huge-table/")
Right:
# Process partition by partition
import pyarrow.parquet as pq
dataset = pq.ParquetDataset("s3://bucket/huge-table/")
for batch in dataset.to_batches(batch_size=100_000):
chunk = pl.from_arrow(batch)
process(chunk)
2. String Concatenation for SQL
Wrong (SQL injection risk):
query = f"SELECT * FROM users WHERE name = '{user_input}'"
Right:
# SQLAlchemy -- PostgreSQL dialect
from sqlalchemy import text
query = text("SELECT * FROM users WHERE name = :name")
result = conn.execute(query, {"name": user_input})
3. Ignoring Logging
Wrong:
print(f"Processed {count} records") # Lost in container logs
Right:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Processed %d records", count)
4. No Type Hints
Data engineering code is often maintained by multiple people. Type hints are documentation that the IDE enforces:
# Without types -- what does this return? What's 'config'?
def extract(config, start_date):
...
# With types -- clear contract
def extract(
config: PipelineConfig,
start_date: datetime,
) -> Generator[list[dict], None, None]:
...
5. Synchronous Where Async Helps
When ingesting from multiple independent APIs, asyncio + httpx can cut wall-clock time dramatically:
import asyncio
import httpx
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return [
r.json() for r in responses
if isinstance(r, httpx.Response) and r.status_code == 200
]
# Usage
results = asyncio.run(fetch_all(api_endpoints))
Python Performance Tips for Data Pipelines
| Technique | Impact | When to Use |
|---|---|---|
| Use generators instead of lists | Constant memory | Anything paginated or batched |
| polars lazy mode | 5-10x faster than eager pandas | Aggregations on files >100MB |
| PyArrow for I/O | 3-5x faster Parquet reads | Any Parquet read/write |
orjson instead of json | 3-10x faster JSON parsing | High-volume API ingestion |
| Connection pooling | Eliminates connection overhead | Any database-heavy pipeline |
multiprocessing.Pool | Utilize all CPU cores | CPU-bound transforms (not I/O) |
Putting It Together: A Mini Pipeline
Here's how the patterns connect in a real extract-transform-load flow:
# main.py -- orchestration entrypoint
import logging
from my_pipeline.models import PipelineConfig
from my_pipeline.extract import fetch_paginated
from my_pipeline.transform import clean_events, flag_anomalies
from my_pipeline.load import batch_upsert
import polars as pl
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run():
config = PipelineConfig() # validates env vars or crashes
# Extract: paginated API fetch (generator)
raw_pages = fetch_paginated(
base_url=config.source_api_url,
headers={"Authorization": f"Bearer {config.api_token}"},
page_size=config.batch_size,
)
# Transform: process each page, yield cleaned batches
def transform_stream(pages):
for page in pages:
df = pl.DataFrame(page)
df = clean_events(df)
df = flag_anomalies(df)
yield df.to_dicts()
# Load: upsert into PostgreSQL
total = batch_upsert(
connection_string=config.db.connection_string,
table="events",
records=transform_stream(raw_pages),
conflict_key="event_id",
)
logger.info(f"Pipeline complete. {total} records upserted.")
if __name__ == "__main__":
run()
The entire pipeline runs in constant memory regardless of source size, validates config before touching any data, and each component is independently testable.
Continue Reading
- Apache Airflow Tutorial — orchestrate your Python pipelines on a schedule with dependency management
- Data Quality Testing — add Great Expectations or similar validation to your pipeline outputs
- Data Pipeline Monitoring — make sure your pipelines tell you when something breaks before your stakeholders do
If you want to prototype pipeline logic quickly — run SQL against CSVs and Parquet files, test API responses interactively — Harbinger Explorer lets you do that directly in the browser with DuckDB WASM. Useful for validating transforms before committing them to your production pipeline code.
The best Python data engineering code isn't clever. It's boring, predictable, well-typed, and handles failures before they become incidents. Write code that lets you sleep through the night.
Continue Reading
Apache Spark Tutorial: From Zero to Your First Data Pipeline
A hands-on Apache Spark tutorial covering core concepts, PySpark DataFrames, transformations, and real-world pipeline patterns for data engineers.
Natural Language SQL: Ask Your Data Questions in Plain English
How NL2SQL works, real examples of natural language questions converted to SQL, an honest comparison of tools, and where it fails.
DuckDB Tutorial: Analytical SQL Directly in Your Browser
Get started with DuckDB in 15 minutes. Learn read_parquet, read_csv_auto, PIVOT, and when DuckDB beats SQLite and PostgreSQL for analytical SQL.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial