Módulo 20 20 min de lectura

20 - Arquitectura Enterprise

Hexagonal Architecture, DDD, Clean Architecture y patrones de diseño aplicados a FastAPI.

#architecture #hexagonal #ddd #clean-architecture #patterns

1. El Problema de la Arquitectura “por Defecto”

La mayoría de tutoriales de FastAPI muestran código así:

# ❌ Arquitectura "tutorial" - Todo acoplado
@app.post("/users")
async def create_user(data: UserCreate, db: Session = Depends(get_db)):
    # Validación de negocio en el endpoint
    if await db.execute(select(User).where(User.email == data.email)).first():
        raise HTTPException(400, "Email exists")
    
    # Lógica de negocio mezclada con infraestructura
    user = User(**data.dict())
    user.hashed_password = hash_password(data.password)
    db.add(user)
    await db.commit()
    
    # Efectos secundarios acoplados
    send_welcome_email(user.email)
    
    return user

Problemas:


2. Hexagonal Architecture (Ports & Adapters)

                    ┌─────────────────────────────────┐
                    │         Application Core        │
                    │  ┌─────────────────────────┐   │
    ┌─────────┐     │  │      Domain Layer       │   │     ┌─────────┐
    │ FastAPI │◄────┼──┤  - Entities             │   │────►│PostgreSQL│
    │ (Port)  │     │  │  - Value Objects        │   │     │(Adapter) │
    └─────────┘     │  │  - Domain Services      │   │     └─────────┘
                    │  │  - Domain Events        │   │
    ┌─────────┐     │  └─────────────────────────┘   │     ┌─────────┐
    │   CLI   │◄────┼──┤                             │────►│  Redis  │
    │ (Port)  │     │  │      Application Layer     │     │(Adapter) │
    └─────────┘     │  │  - Use Cases              │     └─────────┘
                    │  │  - Application Services   │
    ┌─────────┐     │  │  - DTOs                   │     ┌─────────┐
    │  Celery │◄────┼──┤                             │────►│  Email  │
    │ (Port)  │     │  └─────────────────────────┘   │     │(Adapter) │
    └─────────┘     │                                 │     └─────────┘
                    └─────────────────────────────────┘

Principio Fundamental

El dominio no conoce la infraestructura. Los adaptadores externos (DB, HTTP, Email) implementan interfaces definidas por el dominio.


3. Estructura de Proyecto Hexagonal

src/
├── domain/                    # 🎯 Core - Sin dependencias externas
│   ├── entities/
│   │   ├── user.py
│   │   └── order.py
│   ├── value_objects/
│   │   ├── email.py
│   │   └── money.py
│   ├── events/
│   │   └── user_events.py
│   ├── services/
│   │   └── pricing_service.py
│   └── repositories/          # Interfaces (Ports)
│       ├── user_repository.py
│       └── order_repository.py

├── application/               # 📋 Use Cases
│   ├── use_cases/
│   │   ├── create_user.py
│   │   ├── place_order.py
│   │   └── process_payment.py
│   ├── services/
│   │   └── user_service.py
│   └── dtos/
│       ├── user_dto.py
│       └── order_dto.py

├── infrastructure/            # 🔌 Adapters
│   ├── persistence/
│   │   ├── sqlalchemy/
│   │   │   ├── models.py
│   │   │   ├── user_repository.py
│   │   │   └── order_repository.py
│   │   └── redis/
│   │       └── cache_repository.py
│   ├── external/
│   │   ├── email_service.py
│   │   └── payment_gateway.py
│   └── config/
│       └── settings.py

└── interfaces/                # 🌐 Entry Points (Ports)
    ├── api/
    │   ├── main.py
    │   ├── dependencies.py
    │   └── routers/
    │       ├── users.py
    │       └── orders.py
    ├── cli/
    │   └── commands.py
    └── workers/
        └── celery_tasks.py

4. Domain Layer: Entidades y Value Objects

Entidad (Identity + Behavior)

# domain/entities/user.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from domain.value_objects.email import Email
from domain.events.user_events import UserCreated, UserEmailChanged

@dataclass
class User:
    """Entidad de dominio User - Lógica de negocio pura."""
    
    id: Optional[int] = None
    email: Email = field(default_factory=lambda: Email(""))
    hashed_password: str = ""
    is_active: bool = True
    created_at: datetime = field(default_factory=datetime.utcnow)
    
    _events: list = field(default_factory=list, repr=False)
    
    @classmethod
    def create(cls, email: str, hashed_password: str) -> "User":
        """Factory method con validación de negocio."""
        user = cls(
            email=Email(email),
            hashed_password=hashed_password,
        )
        user._events.append(UserCreated(email=email))
        return user
    
    def change_email(self, new_email: str) -> None:
        """Cambiar email con evento de dominio."""
        old_email = self.email.value
        self.email = Email(new_email)
        self._events.append(UserEmailChanged(
            user_id=self.id,
            old_email=old_email,
            new_email=new_email,
        ))
    
    def deactivate(self) -> None:
        """Desactivar usuario con reglas de negocio."""
        if not self.is_active:
            raise DomainError("User is already inactive")
        self.is_active = False
    
    def collect_events(self) -> list:
        """Recolectar y limpiar eventos pendientes."""
        events = self._events.copy()
        self._events.clear()
        return events

Value Object (Immutable + Validation)

# domain/value_objects/email.py
from dataclasses import dataclass
import re

@dataclass(frozen=True)  # Inmutable
class Email:
    """Value Object para email validado."""
    
    value: str
    
    def __post_init__(self):
        if not self._is_valid(self.value):
            raise ValueError(f"Invalid email: {self.value}")
    
    @staticmethod
    def _is_valid(email: str) -> bool:
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    def __str__(self) -> str:
        return self.value
    
    @property
    def domain(self) -> str:
        return self.value.split("@")[1]


# domain/value_objects/money.py
from dataclasses import dataclass
from decimal import Decimal

@dataclass(frozen=True)
class Money:
    """Value Object para valores monetarios."""
    
    amount: Decimal
    currency: str = "EUR"
    
    def __post_init__(self):
        if self.amount < 0:
            raise ValueError("Amount cannot be negative")
        # Forzar 2 decimales
        object.__setattr__(self, 'amount', self.amount.quantize(Decimal('0.01')))
    
    def __add__(self, other: "Money") -> "Money":
        if self.currency != other.currency:
            raise ValueError("Cannot add different currencies")
        return Money(self.amount + other.amount, self.currency)
    
    def __mul__(self, factor: int | float) -> "Money":
        return Money(self.amount * Decimal(str(factor)), self.currency)

5. Repository Pattern (Port)

# domain/repositories/user_repository.py
from typing import Protocol, Optional
from domain.entities.user import User

class UserRepository(Protocol):
    """Port: Interfaz que el dominio espera."""
    
    async def get(self, user_id: int) -> Optional[User]:
        """Obtener usuario por ID."""
        ...
    
    async def get_by_email(self, email: str) -> Optional[User]:
        """Obtener usuario por email."""
        ...
    
    async def save(self, user: User) -> User:
        """Guardar usuario (create o update)."""
        ...
    
    async def delete(self, user_id: int) -> None:
        """Eliminar usuario."""
        ...
    
    async def exists_by_email(self, email: str) -> bool:
        """Verificar si existe email."""
        ...

Implementación SQLAlchemy (Adapter)

# infrastructure/persistence/sqlalchemy/user_repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from domain.entities.user import User as UserEntity
from domain.repositories.user_repository import UserRepository
from .models import UserModel

class SQLAlchemyUserRepository(UserRepository):
    """Adapter: Implementación concreta con SQLAlchemy."""
    
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def get(self, user_id: int) -> UserEntity | None:
        model = await self.session.get(UserModel, user_id)
        return self._to_entity(model) if model else None
    
    async def get_by_email(self, email: str) -> UserEntity | None:
        stmt = select(UserModel).where(UserModel.email == email)
        result = await self.session.execute(stmt)
        model = result.scalar_one_or_none()
        return self._to_entity(model) if model else None
    
    async def save(self, user: UserEntity) -> UserEntity:
        if user.id:
            # Update
            model = await self.session.get(UserModel, user.id)
            model.email = str(user.email)
            model.hashed_password = user.hashed_password
            model.is_active = user.is_active
        else:
            # Create
            model = UserModel(
                email=str(user.email),
                hashed_password=user.hashed_password,
                is_active=user.is_active,
            )
            self.session.add(model)
        
        await self.session.flush()
        await self.session.refresh(model)
        
        return self._to_entity(model)
    
    async def delete(self, user_id: int) -> None:
        model = await self.session.get(UserModel, user_id)
        if model:
            await self.session.delete(model)
    
    async def exists_by_email(self, email: str) -> bool:
        stmt = select(UserModel.id).where(UserModel.email == email)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none() is not None
    
    def _to_entity(self, model: UserModel) -> UserEntity:
        """Mapear modelo de DB a entidad de dominio."""
        return UserEntity(
            id=model.id,
            email=Email(model.email),
            hashed_password=model.hashed_password,
            is_active=model.is_active,
            created_at=model.created_at,
        )

6. Application Layer: Use Cases

# application/use_cases/create_user.py
from dataclasses import dataclass
from domain.entities.user import User
from domain.repositories.user_repository import UserRepository
from domain.services.password_service import PasswordService
from application.dtos.user_dto import CreateUserDTO, UserResponseDTO
from domain.exceptions import ConflictError

@dataclass
class CreateUserUseCase:
    """Use Case: Crear nuevo usuario."""
    
    user_repository: UserRepository
    password_service: PasswordService
    event_publisher: EventPublisher
    
    async def execute(self, dto: CreateUserDTO) -> UserResponseDTO:
        # 1. Validación de negocio
        if await self.user_repository.exists_by_email(dto.email):
            raise ConflictError(f"Email {dto.email} already registered")
        
        # 2. Crear entidad de dominio
        hashed_password = self.password_service.hash(dto.password)
        user = User.create(email=dto.email, hashed_password=hashed_password)
        
        # 3. Persistir
        saved_user = await self.user_repository.save(user)
        
        # 4. Publicar eventos de dominio
        for event in saved_user.collect_events():
            await self.event_publisher.publish(event)
        
        # 5. Retornar DTO
        return UserResponseDTO.from_entity(saved_user)

Application Service (Orquestador)

# application/services/user_service.py
from domain.repositories.user_repository import UserRepository
from application.use_cases.create_user import CreateUserUseCase
from application.use_cases.update_user import UpdateUserUseCase

class UserApplicationService:
    """Servicio de aplicación que orquesta use cases."""
    
    def __init__(
        self,
        user_repository: UserRepository,
        password_service: PasswordService,
        event_publisher: EventPublisher,
    ):
        self.user_repository = user_repository
        self.password_service = password_service
        self.event_publisher = event_publisher
    
    async def create_user(self, dto: CreateUserDTO) -> UserResponseDTO:
        use_case = CreateUserUseCase(
            user_repository=self.user_repository,
            password_service=self.password_service,
            event_publisher=self.event_publisher,
        )
        return await use_case.execute(dto)
    
    async def get_user(self, user_id: int) -> UserResponseDTO:
        user = await self.user_repository.get(user_id)
        if not user:
            raise NotFoundError(f"User {user_id} not found")
        return UserResponseDTO.from_entity(user)

7. Interface Layer: FastAPI Router

# interfaces/api/routers/users.py
from fastapi import APIRouter, Depends, status
from typing import Annotated
from application.services.user_service import UserApplicationService
from application.dtos.user_dto import CreateUserDTO, UserResponseDTO
from interfaces.api.dependencies import get_user_service

router = APIRouter(prefix="/users", tags=["users"])

UserService = Annotated[UserApplicationService, Depends(get_user_service)]

@router.post("", status_code=status.HTTP_201_CREATED, response_model=UserResponseDTO)
async def create_user(
    dto: CreateUserDTO,
    service: UserService,
) -> UserResponseDTO:
    """
    El router solo:
    1. Recibe HTTP request
    2. Delega al Application Service
    3. Retorna HTTP response
    """
    return await service.create_user(dto)

@router.get("/{user_id}", response_model=UserResponseDTO)
async def get_user(user_id: int, service: UserService) -> UserResponseDTO:
    return await service.get_user(user_id)

Dependency Injection

# interfaces/api/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from infrastructure.persistence.sqlalchemy.database import async_session_maker
from infrastructure.persistence.sqlalchemy.user_repository import SQLAlchemyUserRepository
from infrastructure.external.event_publisher import RedisEventPublisher
from application.services.user_service import UserApplicationService
from domain.services.password_service import Argon2PasswordService

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        yield session

async def get_user_repository(
    db: Annotated[AsyncSession, Depends(get_db)]
) -> SQLAlchemyUserRepository:
    return SQLAlchemyUserRepository(db)

async def get_event_publisher() -> RedisEventPublisher:
    return RedisEventPublisher(redis_client)

async def get_user_service(
    user_repo: Annotated[SQLAlchemyUserRepository, Depends(get_user_repository)],
    event_publisher: Annotated[RedisEventPublisher, Depends(get_event_publisher)],
) -> UserApplicationService:
    return UserApplicationService(
        user_repository=user_repo,
        password_service=Argon2PasswordService(),
        event_publisher=event_publisher,
    )

8. Domain Events

# domain/events/user_events.py
from dataclasses import dataclass
from datetime import datetime

@dataclass(frozen=True)
class DomainEvent:
    """Base para eventos de dominio."""
    occurred_at: datetime = datetime.utcnow()

@dataclass(frozen=True)
class UserCreated(DomainEvent):
    email: str

@dataclass(frozen=True)
class UserEmailChanged(DomainEvent):
    user_id: int
    old_email: str
    new_email: str

# infrastructure/external/event_publisher.py
from typing import Protocol
from domain.events.user_events import DomainEvent

class EventPublisher(Protocol):
    async def publish(self, event: DomainEvent) -> None: ...

class RedisEventPublisher(EventPublisher):
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def publish(self, event: DomainEvent) -> None:
        channel = f"events:{event.__class__.__name__}"
        await self.redis.publish(channel, json.dumps(asdict(event), default=str))

# Subscriber (puede ser Celery worker)
async def handle_user_created(event: UserCreated):
    await send_welcome_email(event.email)

9. Testing en Arquitectura Hexagonal

# tests/unit/test_create_user_use_case.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from application.use_cases.create_user import CreateUserUseCase
from application.dtos.user_dto import CreateUserDTO
from domain.exceptions import ConflictError

@pytest.fixture
def mock_user_repository():
    return AsyncMock()

@pytest.fixture
def mock_password_service():
    service = MagicMock()
    service.hash.return_value = "hashed_password"
    return service

@pytest.fixture
def mock_event_publisher():
    return AsyncMock()

@pytest.fixture
def use_case(mock_user_repository, mock_password_service, mock_event_publisher):
    return CreateUserUseCase(
        user_repository=mock_user_repository,
        password_service=mock_password_service,
        event_publisher=mock_event_publisher,
    )

async def test_create_user_success(use_case, mock_user_repository):
    # Arrange
    mock_user_repository.exists_by_email.return_value = False
    mock_user_repository.save.return_value = User(
        id=1,
        email=Email("test@example.com"),
        hashed_password="hashed",
        is_active=True,
    )
    
    dto = CreateUserDTO(email="test@example.com", password="password123")
    
    # Act
    result = await use_case.execute(dto)
    
    # Assert
    assert result.id == 1
    assert result.email == "test@example.com"
    mock_user_repository.save.assert_called_once()

async def test_create_user_duplicate_email(use_case, mock_user_repository):
    # Arrange
    mock_user_repository.exists_by_email.return_value = True
    dto = CreateUserDTO(email="exists@example.com", password="password123")
    
    # Act & Assert
    with pytest.raises(ConflictError):
        await use_case.execute(dto)

10. Comparativa de Arquitecturas

AspectoLayeredHexagonalClean
DependenciasTop-downOutside-inCenter-out
CoreServicesDomainEntities
FrameworkAcopladoDesacopladoDesacoplado
TestingDifícilFácilFácil
ComplejidadBajaMediaAlta
CurvaSuaveModeradaPronunciada

11. Cuándo Usar Cada Arquitectura

Simple CRUD → Arquitectura en Capas

Router → Service → Repository → DB

Lógica de Negocio Compleja → Hexagonal

Port (API) → Application → Domain ← Adapter (DB)

Múltiples Bounded Contexts → DDD + Hexagonal

Context A ←→ Event Bus ←→ Context B

12. Checklist de Arquitectura Enterprise


Conclusión

La Arquitectura Hexagonal aplicada a FastAPI proporciona:

  1. Dominio puro — Entidades y reglas sin dependencias externas
  2. Ports & Adapters — Interfaces que permiten cambiar implementaciones
  3. Testabilidad — Mocks fáciles gracias a Protocol
  4. Escalabilidad — Equipos pueden trabajar en paralelo
  5. Mantenibilidad — Cambios localizados

Pattern Senior: Empieza con arquitectura en capas. Cuando la lógica de negocio crezca, refactoriza a Hexagonal. No sobreingenieres desde el día 1.


Resumen del Curso

Has completado la transición de Node.js/NestJS a Python/FastAPI:

BloqueCapítulosConceptos Clave
Runtime1-3CPython, GIL, uv, Pydantic V2
Async4asyncio, TaskGroups, coroutines
FastAPI5-7DI, Middleware, Validation
Data8-9SQLAlchemy 2.0, Alembic
Security10OAuth2, JWT, RBAC
Quality11-12Pytest, Observability
Production13-16ASGI, Docker, CI/CD
Distributed17-19Celery, Caching, WebSockets
Architecture20Hexagonal, DDD

Tu stack enterprise Python está listo. 🚀