Módulo 20 • 20 min de lectura
20 - Arquitectura Enterprise
Hexagonal Architecture, DDD, Clean Architecture y patrones de diseño aplicados a FastAPI.
#architecture
#hexagonal
#ddd
#clean-architecture
#patterns
1. El Problema de la Arquitectura “por Defecto”
La mayoría de tutoriales de FastAPI muestran código así:
# ❌ Arquitectura "tutorial" - Todo acoplado
@app.post("/users")
async def create_user(data: UserCreate, db: Session = Depends(get_db)):
# Validación de negocio en el endpoint
if await db.execute(select(User).where(User.email == data.email)).first():
raise HTTPException(400, "Email exists")
# Lógica de negocio mezclada con infraestructura
user = User(**data.dict())
user.hashed_password = hash_password(data.password)
db.add(user)
await db.commit()
# Efectos secundarios acoplados
send_welcome_email(user.email)
return user
Problemas:
- Lógica de negocio en controladores
- Imposible testear sin DB real
- Acoplamiento a FastAPI/SQLAlchemy
- Difícil de mantener a escala
2. Hexagonal Architecture (Ports & Adapters)
┌─────────────────────────────────┐
│ Application Core │
│ ┌─────────────────────────┐ │
┌─────────┐ │ │ Domain Layer │ │ ┌─────────┐
│ FastAPI │◄────┼──┤ - Entities │ │────►│PostgreSQL│
│ (Port) │ │ │ - Value Objects │ │ │(Adapter) │
└─────────┘ │ │ - Domain Services │ │ └─────────┘
│ │ - Domain Events │ │
┌─────────┐ │ └─────────────────────────┘ │ ┌─────────┐
│ CLI │◄────┼──┤ │────►│ Redis │
│ (Port) │ │ │ Application Layer │ │(Adapter) │
└─────────┘ │ │ - Use Cases │ └─────────┘
│ │ - Application Services │
┌─────────┐ │ │ - DTOs │ ┌─────────┐
│ Celery │◄────┼──┤ │────►│ Email │
│ (Port) │ │ └─────────────────────────┘ │ │(Adapter) │
└─────────┘ │ │ └─────────┘
└─────────────────────────────────┘
Principio Fundamental
El dominio no conoce la infraestructura. Los adaptadores externos (DB, HTTP, Email) implementan interfaces definidas por el dominio.
3. Estructura de Proyecto Hexagonal
src/
├── domain/ # 🎯 Core - Sin dependencias externas
│ ├── entities/
│ │ ├── user.py
│ │ └── order.py
│ ├── value_objects/
│ │ ├── email.py
│ │ └── money.py
│ ├── events/
│ │ └── user_events.py
│ ├── services/
│ │ └── pricing_service.py
│ └── repositories/ # Interfaces (Ports)
│ ├── user_repository.py
│ └── order_repository.py
│
├── application/ # 📋 Use Cases
│ ├── use_cases/
│ │ ├── create_user.py
│ │ ├── place_order.py
│ │ └── process_payment.py
│ ├── services/
│ │ └── user_service.py
│ └── dtos/
│ ├── user_dto.py
│ └── order_dto.py
│
├── infrastructure/ # 🔌 Adapters
│ ├── persistence/
│ │ ├── sqlalchemy/
│ │ │ ├── models.py
│ │ │ ├── user_repository.py
│ │ │ └── order_repository.py
│ │ └── redis/
│ │ └── cache_repository.py
│ ├── external/
│ │ ├── email_service.py
│ │ └── payment_gateway.py
│ └── config/
│ └── settings.py
│
└── interfaces/ # 🌐 Entry Points (Ports)
├── api/
│ ├── main.py
│ ├── dependencies.py
│ └── routers/
│ ├── users.py
│ └── orders.py
├── cli/
│ └── commands.py
└── workers/
└── celery_tasks.py
4. Domain Layer: Entidades y Value Objects
Entidad (Identity + Behavior)
# domain/entities/user.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from domain.value_objects.email import Email
from domain.events.user_events import UserCreated, UserEmailChanged
@dataclass
class User:
"""Entidad de dominio User - Lógica de negocio pura."""
id: Optional[int] = None
email: Email = field(default_factory=lambda: Email(""))
hashed_password: str = ""
is_active: bool = True
created_at: datetime = field(default_factory=datetime.utcnow)
_events: list = field(default_factory=list, repr=False)
@classmethod
def create(cls, email: str, hashed_password: str) -> "User":
"""Factory method con validación de negocio."""
user = cls(
email=Email(email),
hashed_password=hashed_password,
)
user._events.append(UserCreated(email=email))
return user
def change_email(self, new_email: str) -> None:
"""Cambiar email con evento de dominio."""
old_email = self.email.value
self.email = Email(new_email)
self._events.append(UserEmailChanged(
user_id=self.id,
old_email=old_email,
new_email=new_email,
))
def deactivate(self) -> None:
"""Desactivar usuario con reglas de negocio."""
if not self.is_active:
raise DomainError("User is already inactive")
self.is_active = False
def collect_events(self) -> list:
"""Recolectar y limpiar eventos pendientes."""
events = self._events.copy()
self._events.clear()
return events
Value Object (Immutable + Validation)
# domain/value_objects/email.py
from dataclasses import dataclass
import re
@dataclass(frozen=True) # Inmutable
class Email:
"""Value Object para email validado."""
value: str
def __post_init__(self):
if not self._is_valid(self.value):
raise ValueError(f"Invalid email: {self.value}")
@staticmethod
def _is_valid(email: str) -> bool:
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def __str__(self) -> str:
return self.value
@property
def domain(self) -> str:
return self.value.split("@")[1]
# domain/value_objects/money.py
from dataclasses import dataclass
from decimal import Decimal
@dataclass(frozen=True)
class Money:
"""Value Object para valores monetarios."""
amount: Decimal
currency: str = "EUR"
def __post_init__(self):
if self.amount < 0:
raise ValueError("Amount cannot be negative")
# Forzar 2 decimales
object.__setattr__(self, 'amount', self.amount.quantize(Decimal('0.01')))
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)
def __mul__(self, factor: int | float) -> "Money":
return Money(self.amount * Decimal(str(factor)), self.currency)
5. Repository Pattern (Port)
# domain/repositories/user_repository.py
from typing import Protocol, Optional
from domain.entities.user import User
class UserRepository(Protocol):
"""Port: Interfaz que el dominio espera."""
async def get(self, user_id: int) -> Optional[User]:
"""Obtener usuario por ID."""
...
async def get_by_email(self, email: str) -> Optional[User]:
"""Obtener usuario por email."""
...
async def save(self, user: User) -> User:
"""Guardar usuario (create o update)."""
...
async def delete(self, user_id: int) -> None:
"""Eliminar usuario."""
...
async def exists_by_email(self, email: str) -> bool:
"""Verificar si existe email."""
...
Implementación SQLAlchemy (Adapter)
# infrastructure/persistence/sqlalchemy/user_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from domain.entities.user import User as UserEntity
from domain.repositories.user_repository import UserRepository
from .models import UserModel
class SQLAlchemyUserRepository(UserRepository):
"""Adapter: Implementación concreta con SQLAlchemy."""
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, user_id: int) -> UserEntity | None:
model = await self.session.get(UserModel, user_id)
return self._to_entity(model) if model else None
async def get_by_email(self, email: str) -> UserEntity | None:
stmt = select(UserModel).where(UserModel.email == email)
result = await self.session.execute(stmt)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def save(self, user: UserEntity) -> UserEntity:
if user.id:
# Update
model = await self.session.get(UserModel, user.id)
model.email = str(user.email)
model.hashed_password = user.hashed_password
model.is_active = user.is_active
else:
# Create
model = UserModel(
email=str(user.email),
hashed_password=user.hashed_password,
is_active=user.is_active,
)
self.session.add(model)
await self.session.flush()
await self.session.refresh(model)
return self._to_entity(model)
async def delete(self, user_id: int) -> None:
model = await self.session.get(UserModel, user_id)
if model:
await self.session.delete(model)
async def exists_by_email(self, email: str) -> bool:
stmt = select(UserModel.id).where(UserModel.email == email)
result = await self.session.execute(stmt)
return result.scalar_one_or_none() is not None
def _to_entity(self, model: UserModel) -> UserEntity:
"""Mapear modelo de DB a entidad de dominio."""
return UserEntity(
id=model.id,
email=Email(model.email),
hashed_password=model.hashed_password,
is_active=model.is_active,
created_at=model.created_at,
)
6. Application Layer: Use Cases
# application/use_cases/create_user.py
from dataclasses import dataclass
from domain.entities.user import User
from domain.repositories.user_repository import UserRepository
from domain.services.password_service import PasswordService
from application.dtos.user_dto import CreateUserDTO, UserResponseDTO
from domain.exceptions import ConflictError
@dataclass
class CreateUserUseCase:
"""Use Case: Crear nuevo usuario."""
user_repository: UserRepository
password_service: PasswordService
event_publisher: EventPublisher
async def execute(self, dto: CreateUserDTO) -> UserResponseDTO:
# 1. Validación de negocio
if await self.user_repository.exists_by_email(dto.email):
raise ConflictError(f"Email {dto.email} already registered")
# 2. Crear entidad de dominio
hashed_password = self.password_service.hash(dto.password)
user = User.create(email=dto.email, hashed_password=hashed_password)
# 3. Persistir
saved_user = await self.user_repository.save(user)
# 4. Publicar eventos de dominio
for event in saved_user.collect_events():
await self.event_publisher.publish(event)
# 5. Retornar DTO
return UserResponseDTO.from_entity(saved_user)
Application Service (Orquestador)
# application/services/user_service.py
from domain.repositories.user_repository import UserRepository
from application.use_cases.create_user import CreateUserUseCase
from application.use_cases.update_user import UpdateUserUseCase
class UserApplicationService:
"""Servicio de aplicación que orquesta use cases."""
def __init__(
self,
user_repository: UserRepository,
password_service: PasswordService,
event_publisher: EventPublisher,
):
self.user_repository = user_repository
self.password_service = password_service
self.event_publisher = event_publisher
async def create_user(self, dto: CreateUserDTO) -> UserResponseDTO:
use_case = CreateUserUseCase(
user_repository=self.user_repository,
password_service=self.password_service,
event_publisher=self.event_publisher,
)
return await use_case.execute(dto)
async def get_user(self, user_id: int) -> UserResponseDTO:
user = await self.user_repository.get(user_id)
if not user:
raise NotFoundError(f"User {user_id} not found")
return UserResponseDTO.from_entity(user)
7. Interface Layer: FastAPI Router
# interfaces/api/routers/users.py
from fastapi import APIRouter, Depends, status
from typing import Annotated
from application.services.user_service import UserApplicationService
from application.dtos.user_dto import CreateUserDTO, UserResponseDTO
from interfaces.api.dependencies import get_user_service
router = APIRouter(prefix="/users", tags=["users"])
UserService = Annotated[UserApplicationService, Depends(get_user_service)]
@router.post("", status_code=status.HTTP_201_CREATED, response_model=UserResponseDTO)
async def create_user(
dto: CreateUserDTO,
service: UserService,
) -> UserResponseDTO:
"""
El router solo:
1. Recibe HTTP request
2. Delega al Application Service
3. Retorna HTTP response
"""
return await service.create_user(dto)
@router.get("/{user_id}", response_model=UserResponseDTO)
async def get_user(user_id: int, service: UserService) -> UserResponseDTO:
return await service.get_user(user_id)
Dependency Injection
# interfaces/api/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from infrastructure.persistence.sqlalchemy.database import async_session_maker
from infrastructure.persistence.sqlalchemy.user_repository import SQLAlchemyUserRepository
from infrastructure.external.event_publisher import RedisEventPublisher
from application.services.user_service import UserApplicationService
from domain.services.password_service import Argon2PasswordService
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_repository(
db: Annotated[AsyncSession, Depends(get_db)]
) -> SQLAlchemyUserRepository:
return SQLAlchemyUserRepository(db)
async def get_event_publisher() -> RedisEventPublisher:
return RedisEventPublisher(redis_client)
async def get_user_service(
user_repo: Annotated[SQLAlchemyUserRepository, Depends(get_user_repository)],
event_publisher: Annotated[RedisEventPublisher, Depends(get_event_publisher)],
) -> UserApplicationService:
return UserApplicationService(
user_repository=user_repo,
password_service=Argon2PasswordService(),
event_publisher=event_publisher,
)
8. Domain Events
# domain/events/user_events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True)
class DomainEvent:
"""Base para eventos de dominio."""
occurred_at: datetime = datetime.utcnow()
@dataclass(frozen=True)
class UserCreated(DomainEvent):
email: str
@dataclass(frozen=True)
class UserEmailChanged(DomainEvent):
user_id: int
old_email: str
new_email: str
# infrastructure/external/event_publisher.py
from typing import Protocol
from domain.events.user_events import DomainEvent
class EventPublisher(Protocol):
async def publish(self, event: DomainEvent) -> None: ...
class RedisEventPublisher(EventPublisher):
def __init__(self, redis: Redis):
self.redis = redis
async def publish(self, event: DomainEvent) -> None:
channel = f"events:{event.__class__.__name__}"
await self.redis.publish(channel, json.dumps(asdict(event), default=str))
# Subscriber (puede ser Celery worker)
async def handle_user_created(event: UserCreated):
await send_welcome_email(event.email)
9. Testing en Arquitectura Hexagonal
# tests/unit/test_create_user_use_case.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from application.use_cases.create_user import CreateUserUseCase
from application.dtos.user_dto import CreateUserDTO
from domain.exceptions import ConflictError
@pytest.fixture
def mock_user_repository():
return AsyncMock()
@pytest.fixture
def mock_password_service():
service = MagicMock()
service.hash.return_value = "hashed_password"
return service
@pytest.fixture
def mock_event_publisher():
return AsyncMock()
@pytest.fixture
def use_case(mock_user_repository, mock_password_service, mock_event_publisher):
return CreateUserUseCase(
user_repository=mock_user_repository,
password_service=mock_password_service,
event_publisher=mock_event_publisher,
)
async def test_create_user_success(use_case, mock_user_repository):
# Arrange
mock_user_repository.exists_by_email.return_value = False
mock_user_repository.save.return_value = User(
id=1,
email=Email("test@example.com"),
hashed_password="hashed",
is_active=True,
)
dto = CreateUserDTO(email="test@example.com", password="password123")
# Act
result = await use_case.execute(dto)
# Assert
assert result.id == 1
assert result.email == "test@example.com"
mock_user_repository.save.assert_called_once()
async def test_create_user_duplicate_email(use_case, mock_user_repository):
# Arrange
mock_user_repository.exists_by_email.return_value = True
dto = CreateUserDTO(email="exists@example.com", password="password123")
# Act & Assert
with pytest.raises(ConflictError):
await use_case.execute(dto)
10. Comparativa de Arquitecturas
| Aspecto | Layered | Hexagonal | Clean |
|---|---|---|---|
| Dependencias | Top-down | Outside-in | Center-out |
| Core | Services | Domain | Entities |
| Framework | Acoplado | Desacoplado | Desacoplado |
| Testing | Difícil | Fácil | Fácil |
| Complejidad | Baja | Media | Alta |
| Curva | Suave | Moderada | Pronunciada |
11. Cuándo Usar Cada Arquitectura
Simple CRUD → Arquitectura en Capas
Router → Service → Repository → DB
Lógica de Negocio Compleja → Hexagonal
Port (API) → Application → Domain ← Adapter (DB)
Múltiples Bounded Contexts → DDD + Hexagonal
Context A ←→ Event Bus ←→ Context B
12. Checklist de Arquitectura Enterprise
- Dominio aislado — Sin imports de FastAPI/SQLAlchemy
- Entidades con comportamiento — No solo data classes
- Value Objects — Para validación y semántica
- Repository interfaces — Protocol en dominio
- Use Cases explícitos — Un caso de uso por archivo
- DTOs — Separar API contracts de entidades
- Domain Events — Comunicación desacoplada
- Tests unitarios — Sin DB ni HTTP
- Tests de integración — Con adapters reales
Conclusión
La Arquitectura Hexagonal aplicada a FastAPI proporciona:
- Dominio puro — Entidades y reglas sin dependencias externas
- Ports & Adapters — Interfaces que permiten cambiar implementaciones
- Testabilidad — Mocks fáciles gracias a Protocol
- Escalabilidad — Equipos pueden trabajar en paralelo
- Mantenibilidad — Cambios localizados
Pattern Senior: Empieza con arquitectura en capas. Cuando la lógica de negocio crezca, refactoriza a Hexagonal. No sobreingenieres desde el día 1.
Resumen del Curso
Has completado la transición de Node.js/NestJS a Python/FastAPI:
| Bloque | Capítulos | Conceptos Clave |
|---|---|---|
| Runtime | 1-3 | CPython, GIL, uv, Pydantic V2 |
| Async | 4 | asyncio, TaskGroups, coroutines |
| FastAPI | 5-7 | DI, Middleware, Validation |
| Data | 8-9 | SQLAlchemy 2.0, Alembic |
| Security | 10 | OAuth2, JWT, RBAC |
| Quality | 11-12 | Pytest, Observability |
| Production | 13-16 | ASGI, Docker, CI/CD |
| Distributed | 17-19 | Celery, Caching, WebSockets |
| Architecture | 20 | Hexagonal, DDD |
Tu stack enterprise Python está listo. 🚀