Módulo 8 • 22 min de lectura
08 - SQLAlchemy 2.0 y Async ORM
Data Mapper pattern, Session async, relaciones, N+1 queries y connection pooling para producción.
#sqlalchemy
#orm
#database
#async
#postgresql
1. SQLAlchemy 2.0: El Nuevo Paradigma
SQLAlchemy 2.0 introdujo cambios fundamentales respecto a 1.x: sintaxis de queries tipo ORM moderno, soporte nativo async y type hints completos. Como Senior de Drizzle/Prisma, encontrarás el estilo familiar pero con diferencias importantes.
JavaScript/TypeScript
// Drizzle ORM (TypeScript)
import { drizzle } from 'drizzle-orm/node-postgres';
import { users } from './schema';
const db = drizzle(pool);
// Query
const user = await db
.select()
.from(users)
.where(eq(users.id, 1))
.limit(1);
// Insert
await db.insert(users).values({
email: 'test@test.com',
name: 'Test User',
}); Python
# SQLAlchemy 2.0 (Python)
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def get_user(session: AsyncSession, user_id: int):
# Query con select()
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def create_user(session: AsyncSession, data: dict):
# Insert con add()
user = User(**data)
session.add(user)
await session.flush() # Obtener ID sin commit
return user Diferencias Clave con ORMs de Node.js
| Aspecto | Drizzle/Prisma | SQLAlchemy 2.0 |
|---|---|---|
| Patrón | Query Builder / Active Record | Data Mapper |
| Session | Implícita (conexión por query) | Explícita (Unit of Work) |
| Transacciones | Por query o explícitas | Session = transacción |
| Flush vs Commit | N/A | Flush: sync → DB, Commit: persist |
| Identity Map | No | Sí (objetos cacheados en session) |
2. Configuración Async
# database.py
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
# Engine async con pool configurado
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/mydb",
echo=False, # True para debug SQL
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800, # Reciclar conexiones cada 30 min
pool_pre_ping=True, # Verificar conexión antes de usar
)
# Session factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Evitar lazy loads post-commit
)
# Base para modelos
class Base(DeclarativeBase):
pass
Dependency para FastAPI
from typing import AsyncGenerator
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
3. Definición de Modelos
from sqlalchemy import String, ForeignKey, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
class User(Base):
__tablename__ = "users"
# Columnas con tipos
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
# Relaciones
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
lazy="selectin", # Estrategia de carga
)
profile: Mapped["Profile | None"] = relationship(
back_populates="user",
uselist=False, # One-to-one
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
published_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# Relación inversa
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
secondary="post_tags", # Tabla intermedia
back_populates="posts",
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(
secondary="post_tags",
back_populates="tags",
)
# Tabla many-to-many
from sqlalchemy import Table, Column, Integer
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)
4. Queries: Select Statement
Básico
from sqlalchemy import select, and_, or_, desc
async def get_users(session: AsyncSession) -> list[User]:
stmt = select(User).where(User.is_active == True).order_by(desc(User.created_at))
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_user_by_email(session: AsyncSession, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)
return result.scalar_one_or_none()
Filtros Complejos
async def search_users(
session: AsyncSession,
search: str | None = None,
is_active: bool | None = None,
created_after: datetime | None = None,
) -> list[User]:
stmt = select(User)
conditions = []
if search:
conditions.append(
or_(
User.email.ilike(f"%{search}%"),
User.profile.has(Profile.name.ilike(f"%{search}%")),
)
)
if is_active is not None:
conditions.append(User.is_active == is_active)
if created_after:
conditions.append(User.created_at >= created_after)
if conditions:
stmt = stmt.where(and_(*conditions))
result = await session.execute(stmt)
return list(result.scalars().all())
Paginación
async def list_users_paginated(
session: AsyncSession,
page: int = 1,
page_size: int = 20,
) -> tuple[list[User], int]:
# Count total
count_stmt = select(func.count()).select_from(User)
total = (await session.execute(count_stmt)).scalar() or 0
# Fetch page
stmt = (
select(User)
.order_by(User.id)
.offset((page - 1) * page_size)
.limit(page_size)
)
result = await session.execute(stmt)
users = list(result.scalars().all())
return users, total
5. El Problema N+1 y Estrategias de Carga
El Anti-pattern
# ❌ N+1 queries: 1 query para users + N queries para posts
async def get_users_with_posts_bad(session: AsyncSession):
result = await session.execute(select(User))
users = result.scalars().all()
for user in users:
# ¡Cada acceso a user.posts dispara una query!
print(f"{user.email}: {len(user.posts)} posts") # N queries adicionales
Solución 1: Joined Load (JOIN en una query)
from sqlalchemy.orm import joinedload
async def get_users_with_posts_joined(session: AsyncSession):
stmt = select(User).options(joinedload(User.posts))
result = await session.execute(stmt)
users = result.unique().scalars().all() # unique() por el JOIN
for user in users:
print(f"{user.email}: {len(user.posts)} posts") # Sin queries adicionales
Solución 2: Selectin Load (Query con IN)
from sqlalchemy.orm import selectinload
async def get_users_with_posts_selectin(session: AsyncSession):
# 2 queries: SELECT users + SELECT posts WHERE user_id IN (...)
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
return list(result.scalars().all())
Cuándo usar cada estrategia
| Estrategia | Queries | Uso recomendado |
|---|---|---|
joinedload | 1 (JOIN) | One-to-one, relaciones pequeñas |
selectinload | 2 (IN) | One-to-many, listas grandes |
subqueryload | 2 (subquery) | Legacy, evitar en async |
lazyload | N+1 | ❌ Nunca en async |
Carga Anidada
stmt = (
select(User)
.options(
selectinload(User.posts).selectinload(Post.tags), # Posts y sus tags
joinedload(User.profile), # Profile (one-to-one)
)
)
6. Inserts, Updates y Deletes
Insert
async def create_user(session: AsyncSession, data: UserCreate) -> User:
user = User(**data.model_dump())
session.add(user)
await session.flush() # Genera el ID, pero no commitea
await session.refresh(user) # Recarga desde DB (triggers, defaults)
return user
# Bulk insert
async def create_users_bulk(session: AsyncSession, users_data: list[dict]):
users = [User(**data) for data in users_data]
session.add_all(users)
await session.flush()
return users
Update
from sqlalchemy import update
# Update individual (con session tracking)
async def update_user(session: AsyncSession, user_id: int, data: dict) -> User:
user = await session.get(User, user_id)
if not user:
raise NotFoundError(f"User {user_id} not found")
for key, value in data.items():
setattr(user, key, value)
await session.flush()
return user
# Bulk update (sin cargar objetos)
async def deactivate_old_users(session: AsyncSession, before: datetime):
stmt = (
update(User)
.where(User.last_login < before)
.values(is_active=False)
)
result = await session.execute(stmt)
return result.rowcount
Delete
from sqlalchemy import delete
# Delete individual
async def delete_user(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
if user:
await session.delete(user)
await session.flush()
# Bulk delete
async def delete_inactive_users(session: AsyncSession):
stmt = delete(User).where(User.is_active == False)
result = await session.execute(stmt)
return result.rowcount
7. Transacciones Explícitas
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_funds(
session: AsyncSession,
from_account_id: int,
to_account_id: int,
amount: Decimal,
):
"""Transferencia atómica entre cuentas."""
async with session.begin_nested(): # Savepoint
from_account = await session.get(Account, from_account_id, with_for_update=True)
to_account = await session.get(Account, to_account_id, with_for_update=True)
if not from_account or not to_account:
raise NotFoundError("Account not found")
if from_account.balance < amount:
raise ValidationError("Insufficient funds")
from_account.balance -= amount
to_account.balance += amount
# Crear registro de transacción
transaction = Transaction(
from_account_id=from_account_id,
to_account_id=to_account_id,
amount=amount,
)
session.add(transaction)
# El commit se hace en el context manager de get_db()
Locking (SELECT FOR UPDATE)
async def process_job(session: AsyncSession, job_id: int):
# Lock the row to prevent concurrent processing
stmt = (
select(Job)
.where(Job.id == job_id, Job.status == "pending")
.with_for_update(skip_locked=True) # Skip si ya está locked
)
result = await session.execute(stmt)
job = result.scalar_one_or_none()
if not job:
return None # Ya procesado o locked por otro worker
job.status = "processing"
await session.flush()
return job
8. Connection Pooling para Producción
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
# Desarrollo: Pool normal
dev_engine = create_async_engine(
DATABASE_URL,
pool_size=5,
max_overflow=10,
)
# Producción en K8s (un worker por pod): Sin pool
prod_engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # Cada query abre/cierra conexión
)
# Producción con Gunicorn (múltiples workers): Pool con límites
prod_gunicorn_engine = create_async_engine(
DATABASE_URL,
pool_size=2, # Por worker
max_overflow=3,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
Health Check de Conexión
from sqlalchemy import text
async def check_db_health(session: AsyncSession) -> bool:
try:
await session.execute(text("SELECT 1"))
return True
except Exception:
return False
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
db_healthy = await check_db_health(db)
return {
"status": "healthy" if db_healthy else "unhealthy",
"database": db_healthy,
}
9. Repository Pattern
from typing import Generic, TypeVar, Type
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
ModelT = TypeVar("ModelT", bound=Base)
class BaseRepository(Generic[ModelT]):
def __init__(self, session: AsyncSession, model: Type[ModelT]):
self.session = session
self.model = model
async def get(self, id: int) -> ModelT | None:
return await self.session.get(self.model, id)
async def get_or_raise(self, id: int) -> ModelT:
obj = await self.get(id)
if not obj:
raise NotFoundError(f"{self.model.__name__} {id} not found")
return obj
async def list(
self,
skip: int = 0,
limit: int = 100,
) -> list[ModelT]:
stmt = select(self.model).offset(skip).limit(limit)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def count(self) -> int:
stmt = select(func.count()).select_from(self.model)
result = await self.session.execute(stmt)
return result.scalar() or 0
async def create(self, data: dict) -> ModelT:
obj = self.model(**data)
self.session.add(obj)
await self.session.flush()
await self.session.refresh(obj)
return obj
async def update(self, id: int, data: dict) -> ModelT:
obj = await self.get_or_raise(id)
for key, value in data.items():
setattr(obj, key, value)
await self.session.flush()
return obj
async def delete(self, id: int) -> None:
obj = await self.get_or_raise(id)
await self.session.delete(obj)
await self.session.flush()
# Repositorio específico con queries custom
class UserRepository(BaseRepository[User]):
def __init__(self, session: AsyncSession):
super().__init__(session, User)
async def get_by_email(self, email: str) -> User | None:
stmt = select(User).where(User.email == email)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_with_posts(self, user_id: int) -> User | None:
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
10. Tabla Comparativa: Drizzle vs SQLAlchemy
| Operación | Drizzle | SQLAlchemy 2.0 |
|---|---|---|
| Select | db.select().from(users) | select(User) |
| Where | .where(eq(users.id, 1)) | .where(User.id == 1) |
| Insert | db.insert(users).values({}) | session.add(User()) |
| Update | db.update(users).set({}) | update(User).values({}) |
| Delete | db.delete(users).where() | delete(User).where() |
| Join | .leftJoin(posts, eq()) | .join(User.posts) |
| Transaction | db.transaction(async (tx)) | async with session.begin() |
| Relations | Manual en query | relationship() + loading strategies |
Conclusión
SQLAlchemy 2.0 con async ofrece un modelo mental diferente a Drizzle:
- Session = Unit of Work — Los cambios se acumulan hasta commit
- Identity Map — Los objetos se cachean por ID en la session
- Explicit loading — Configura cómo cargar relaciones (evita N+1)
- Type-safe — Mapped columns dan tipos correctos
Pattern Senior: Usa el Repository pattern para encapsular queries, configura selectinload por defecto para relaciones one-to-many, y nunca uses lazy="select" en async.
En el siguiente capítulo, veremos Alembic para migraciones de base de datos.