Módulo 8 22 min de lectura

08 - SQLAlchemy 2.0 y Async ORM

Data Mapper pattern, Session async, relaciones, N+1 queries y connection pooling para producción.

#sqlalchemy #orm #database #async #postgresql

1. SQLAlchemy 2.0: El Nuevo Paradigma

SQLAlchemy 2.0 introdujo cambios fundamentales respecto a 1.x: sintaxis de queries tipo ORM moderno, soporte nativo async y type hints completos. Como Senior de Drizzle/Prisma, encontrarás el estilo familiar pero con diferencias importantes.

JavaScript/TypeScript
// Drizzle ORM (TypeScript)
import { drizzle } from 'drizzle-orm/node-postgres';
import { users } from './schema';

const db = drizzle(pool);

// Query
const user = await db
.select()
.from(users)
.where(eq(users.id, 1))
.limit(1);

// Insert
await db.insert(users).values({
email: 'test@test.com',
name: 'Test User',
});
Python
# SQLAlchemy 2.0 (Python)
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def get_user(session: AsyncSession, user_id: int):
  # Query con select()
  stmt = select(User).where(User.id == user_id)
  result = await session.execute(stmt)
  return result.scalar_one_or_none()

async def create_user(session: AsyncSession, data: dict):
  # Insert con add()
  user = User(**data)
  session.add(user)
  await session.flush()  # Obtener ID sin commit
  return user

Diferencias Clave con ORMs de Node.js

AspectoDrizzle/PrismaSQLAlchemy 2.0
PatrónQuery Builder / Active RecordData Mapper
SessionImplícita (conexión por query)Explícita (Unit of Work)
TransaccionesPor query o explícitasSession = transacción
Flush vs CommitN/AFlush: sync → DB, Commit: persist
Identity MapNoSí (objetos cacheados en session)

2. Configuración Async

# database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase

# Engine async con pool configurado
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/mydb",
    echo=False,  # True para debug SQL
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=1800,  # Reciclar conexiones cada 30 min
    pool_pre_ping=True,  # Verificar conexión antes de usar
)

# Session factory
async_session_maker = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Evitar lazy loads post-commit
)

# Base para modelos
class Base(DeclarativeBase):
    pass

Dependency para FastAPI

from typing import AsyncGenerator

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

3. Definición de Modelos

from sqlalchemy import String, ForeignKey, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime

class User(Base):
    __tablename__ = "users"

    # Columnas con tipos
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
    )

    # Relaciones
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author",
        lazy="selectin",  # Estrategia de carga
    )
    profile: Mapped["Profile | None"] = relationship(
        back_populates="user",
        uselist=False,  # One-to-one
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    published_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))

    # Relación inversa
    author: Mapped["User"] = relationship(back_populates="posts")
    tags: Mapped[list["Tag"]] = relationship(
        secondary="post_tags",  # Tabla intermedia
        back_populates="posts",
    )

class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)

    posts: Mapped[list["Post"]] = relationship(
        secondary="post_tags",
        back_populates="tags",
    )

# Tabla many-to-many
from sqlalchemy import Table, Column, Integer

post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)

4. Queries: Select Statement

Básico

from sqlalchemy import select, and_, or_, desc

async def get_users(session: AsyncSession) -> list[User]:
    stmt = select(User).where(User.is_active == True).order_by(desc(User.created_at))
    result = await session.execute(stmt)
    return list(result.scalars().all())

async def get_user_by_email(session: AsyncSession, email: str) -> User | None:
    stmt = select(User).where(User.email == email)
    result = await session.execute(stmt)
    return result.scalar_one_or_none()

Filtros Complejos

async def search_users(
    session: AsyncSession,
    search: str | None = None,
    is_active: bool | None = None,
    created_after: datetime | None = None,
) -> list[User]:
    stmt = select(User)
    
    conditions = []
    if search:
        conditions.append(
            or_(
                User.email.ilike(f"%{search}%"),
                User.profile.has(Profile.name.ilike(f"%{search}%")),
            )
        )
    if is_active is not None:
        conditions.append(User.is_active == is_active)
    if created_after:
        conditions.append(User.created_at >= created_after)
    
    if conditions:
        stmt = stmt.where(and_(*conditions))
    
    result = await session.execute(stmt)
    return list(result.scalars().all())

Paginación

async def list_users_paginated(
    session: AsyncSession,
    page: int = 1,
    page_size: int = 20,
) -> tuple[list[User], int]:
    # Count total
    count_stmt = select(func.count()).select_from(User)
    total = (await session.execute(count_stmt)).scalar() or 0
    
    # Fetch page
    stmt = (
        select(User)
        .order_by(User.id)
        .offset((page - 1) * page_size)
        .limit(page_size)
    )
    result = await session.execute(stmt)
    users = list(result.scalars().all())
    
    return users, total

5. El Problema N+1 y Estrategias de Carga

El Anti-pattern

# ❌ N+1 queries: 1 query para users + N queries para posts
async def get_users_with_posts_bad(session: AsyncSession):
    result = await session.execute(select(User))
    users = result.scalars().all()
    
    for user in users:
        # ¡Cada acceso a user.posts dispara una query!
        print(f"{user.email}: {len(user.posts)} posts")  # N queries adicionales

Solución 1: Joined Load (JOIN en una query)

from sqlalchemy.orm import joinedload

async def get_users_with_posts_joined(session: AsyncSession):
    stmt = select(User).options(joinedload(User.posts))
    result = await session.execute(stmt)
    users = result.unique().scalars().all()  # unique() por el JOIN
    
    for user in users:
        print(f"{user.email}: {len(user.posts)} posts")  # Sin queries adicionales

Solución 2: Selectin Load (Query con IN)

from sqlalchemy.orm import selectinload

async def get_users_with_posts_selectin(session: AsyncSession):
    # 2 queries: SELECT users + SELECT posts WHERE user_id IN (...)
    stmt = select(User).options(selectinload(User.posts))
    result = await session.execute(stmt)
    return list(result.scalars().all())

Cuándo usar cada estrategia

EstrategiaQueriesUso recomendado
joinedload1 (JOIN)One-to-one, relaciones pequeñas
selectinload2 (IN)One-to-many, listas grandes
subqueryload2 (subquery)Legacy, evitar en async
lazyloadN+1❌ Nunca en async

Carga Anidada

stmt = (
    select(User)
    .options(
        selectinload(User.posts).selectinload(Post.tags),  # Posts y sus tags
        joinedload(User.profile),  # Profile (one-to-one)
    )
)

6. Inserts, Updates y Deletes

Insert

async def create_user(session: AsyncSession, data: UserCreate) -> User:
    user = User(**data.model_dump())
    session.add(user)
    await session.flush()  # Genera el ID, pero no commitea
    await session.refresh(user)  # Recarga desde DB (triggers, defaults)
    return user

# Bulk insert
async def create_users_bulk(session: AsyncSession, users_data: list[dict]):
    users = [User(**data) for data in users_data]
    session.add_all(users)
    await session.flush()
    return users

Update

from sqlalchemy import update

# Update individual (con session tracking)
async def update_user(session: AsyncSession, user_id: int, data: dict) -> User:
    user = await session.get(User, user_id)
    if not user:
        raise NotFoundError(f"User {user_id} not found")
    
    for key, value in data.items():
        setattr(user, key, value)
    
    await session.flush()
    return user

# Bulk update (sin cargar objetos)
async def deactivate_old_users(session: AsyncSession, before: datetime):
    stmt = (
        update(User)
        .where(User.last_login < before)
        .values(is_active=False)
    )
    result = await session.execute(stmt)
    return result.rowcount

Delete

from sqlalchemy import delete

# Delete individual
async def delete_user(session: AsyncSession, user_id: int):
    user = await session.get(User, user_id)
    if user:
        await session.delete(user)
        await session.flush()

# Bulk delete
async def delete_inactive_users(session: AsyncSession):
    stmt = delete(User).where(User.is_active == False)
    result = await session.execute(stmt)
    return result.rowcount

7. Transacciones Explícitas

from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_funds(
    session: AsyncSession,
    from_account_id: int,
    to_account_id: int,
    amount: Decimal,
):
    """Transferencia atómica entre cuentas."""
    async with session.begin_nested():  # Savepoint
        from_account = await session.get(Account, from_account_id, with_for_update=True)
        to_account = await session.get(Account, to_account_id, with_for_update=True)
        
        if not from_account or not to_account:
            raise NotFoundError("Account not found")
        
        if from_account.balance < amount:
            raise ValidationError("Insufficient funds")
        
        from_account.balance -= amount
        to_account.balance += amount
        
        # Crear registro de transacción
        transaction = Transaction(
            from_account_id=from_account_id,
            to_account_id=to_account_id,
            amount=amount,
        )
        session.add(transaction)
    
    # El commit se hace en el context manager de get_db()

Locking (SELECT FOR UPDATE)

async def process_job(session: AsyncSession, job_id: int):
    # Lock the row to prevent concurrent processing
    stmt = (
        select(Job)
        .where(Job.id == job_id, Job.status == "pending")
        .with_for_update(skip_locked=True)  # Skip si ya está locked
    )
    result = await session.execute(stmt)
    job = result.scalar_one_or_none()
    
    if not job:
        return None  # Ya procesado o locked por otro worker
    
    job.status = "processing"
    await session.flush()
    
    return job

8. Connection Pooling para Producción

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool

# Desarrollo: Pool normal
dev_engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,
    max_overflow=10,
)

# Producción en K8s (un worker por pod): Sin pool
prod_engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # Cada query abre/cierra conexión
)

# Producción con Gunicorn (múltiples workers): Pool con límites
prod_gunicorn_engine = create_async_engine(
    DATABASE_URL,
    pool_size=2,  # Por worker
    max_overflow=3,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,
)

Health Check de Conexión

from sqlalchemy import text

async def check_db_health(session: AsyncSession) -> bool:
    try:
        await session.execute(text("SELECT 1"))
        return True
    except Exception:
        return False

@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
    db_healthy = await check_db_health(db)
    return {
        "status": "healthy" if db_healthy else "unhealthy",
        "database": db_healthy,
    }

9. Repository Pattern

from typing import Generic, TypeVar, Type
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

ModelT = TypeVar("ModelT", bound=Base)

class BaseRepository(Generic[ModelT]):
    def __init__(self, session: AsyncSession, model: Type[ModelT]):
        self.session = session
        self.model = model

    async def get(self, id: int) -> ModelT | None:
        return await self.session.get(self.model, id)

    async def get_or_raise(self, id: int) -> ModelT:
        obj = await self.get(id)
        if not obj:
            raise NotFoundError(f"{self.model.__name__} {id} not found")
        return obj

    async def list(
        self,
        skip: int = 0,
        limit: int = 100,
    ) -> list[ModelT]:
        stmt = select(self.model).offset(skip).limit(limit)
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def count(self) -> int:
        stmt = select(func.count()).select_from(self.model)
        result = await self.session.execute(stmt)
        return result.scalar() or 0

    async def create(self, data: dict) -> ModelT:
        obj = self.model(**data)
        self.session.add(obj)
        await self.session.flush()
        await self.session.refresh(obj)
        return obj

    async def update(self, id: int, data: dict) -> ModelT:
        obj = await self.get_or_raise(id)
        for key, value in data.items():
            setattr(obj, key, value)
        await self.session.flush()
        return obj

    async def delete(self, id: int) -> None:
        obj = await self.get_or_raise(id)
        await self.session.delete(obj)
        await self.session.flush()

# Repositorio específico con queries custom
class UserRepository(BaseRepository[User]):
    def __init__(self, session: AsyncSession):
        super().__init__(session, User)

    async def get_by_email(self, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

    async def get_with_posts(self, user_id: int) -> User | None:
        stmt = (
            select(User)
            .where(User.id == user_id)
            .options(selectinload(User.posts))
        )
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

10. Tabla Comparativa: Drizzle vs SQLAlchemy

OperaciónDrizzleSQLAlchemy 2.0
Selectdb.select().from(users)select(User)
Where.where(eq(users.id, 1)).where(User.id == 1)
Insertdb.insert(users).values({})session.add(User())
Updatedb.update(users).set({})update(User).values({})
Deletedb.delete(users).where()delete(User).where()
Join.leftJoin(posts, eq()).join(User.posts)
Transactiondb.transaction(async (tx))async with session.begin()
RelationsManual en queryrelationship() + loading strategies

Conclusión

SQLAlchemy 2.0 con async ofrece un modelo mental diferente a Drizzle:

  1. Session = Unit of Work — Los cambios se acumulan hasta commit
  2. Identity Map — Los objetos se cachean por ID en la session
  3. Explicit loading — Configura cómo cargar relaciones (evita N+1)
  4. Type-safe — Mapped columns dan tipos correctos

Pattern Senior: Usa el Repository pattern para encapsular queries, configura selectinload por defecto para relaciones one-to-many, y nunca uses lazy="select" en async.

En el siguiente capítulo, veremos Alembic para migraciones de base de datos.