Módulo 10 20 min de lectura

10 - Autenticación y Autorización

OAuth2, JWT, RBAC, scopes dinámicos y patrones de seguridad enterprise en FastAPI.

#security #oauth2 #jwt #rbac #authentication #authorization

1. De Passport.js a FastAPI Security

JavaScript/TypeScript
// NestJS + Passport
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
constructor() {
  super({
    jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
    secretOrKey: process.env.JWT_SECRET,
  });
}

async validate(payload: JwtPayload) {
  return { userId: payload.sub, roles: payload.roles };
}
}

@UseGuards(JwtAuthGuard)
@Controller('users')
export class UsersController {}
Python
# FastAPI: Security como dependencia
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_current_user(
  token: Annotated[str, Depends(oauth2_scheme)],
  db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
  payload = decode_token(token)
  user = await db.get(User, payload["sub"])
  if not user:
      raise HTTPException(status_code=401)
  return user

@app.get("/users/me")
async def get_me(user: Annotated[User, Depends(get_current_user)]):
  return user

Diferencia clave: FastAPI usa el sistema de dependencias nativo. No hay “guards” separados—la autenticación es una dependencia más que se puede componer y testear.


2. OAuth2 Password Flow Completo

Modelos y Schemas

# schemas/auth.py
from pydantic import BaseModel, EmailStr

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int

class TokenPayload(BaseModel):
    sub: str  # Subject (user ID)
    exp: int  # Expiration timestamp
    iat: int  # Issued at
    type: str  # "access" or "refresh"
    roles: list[str] = []
    scopes: list[str] = []

class LoginRequest(BaseModel):
    email: EmailStr
    password: str

class RegisterRequest(BaseModel):
    email: EmailStr
    password: str
    full_name: str

Configuración de Seguridad

# core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from passlib.context import CryptContext
from pydantic import BaseModel

from app.core.config import settings

pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

class TokenConfig(BaseModel):
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7

token_config = TokenConfig(
    secret_key=settings.SECRET_KEY,
    access_token_expire_minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES,
    refresh_token_expire_days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def create_token(
    subject: str,
    token_type: str,
    expires_delta: timedelta,
    extra_claims: dict[str, Any] | None = None,
) -> str:
    now = datetime.now(timezone.utc)
    expire = now + expires_delta
    
    payload = {
        "sub": subject,
        "exp": expire,
        "iat": now,
        "type": token_type,
        **(extra_claims or {}),
    }
    
    return jwt.encode(payload, token_config.secret_key, algorithm=token_config.algorithm)

def create_access_token(user_id: str, roles: list[str], scopes: list[str]) -> str:
    return create_token(
        subject=user_id,
        token_type="access",
        expires_delta=timedelta(minutes=token_config.access_token_expire_minutes),
        extra_claims={"roles": roles, "scopes": scopes},
    )

def create_refresh_token(user_id: str) -> str:
    return create_token(
        subject=user_id,
        token_type="refresh",
        expires_delta=timedelta(days=token_config.refresh_token_expire_days),
    )

def decode_token(token: str) -> TokenPayload:
    try:
        payload = jwt.decode(
            token,
            token_config.secret_key,
            algorithms=[token_config.algorithm],
        )
        return TokenPayload(**payload)
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token expired",
            headers={"WWW-Authenticate": "Bearer"},
        )
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token",
            headers={"WWW-Authenticate": "Bearer"},
        )

Endpoints de Autenticación

# api/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/token", response_model=Token)
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> Token:
    """OAuth2 compatible token endpoint."""
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    return Token(
        access_token=create_access_token(
            str(user.id),
            roles=[r.name for r in user.roles],
            scopes=form_data.scopes,
        ),
        refresh_token=create_refresh_token(str(user.id)),
        expires_in=token_config.access_token_expire_minutes * 60,
    )

@router.post("/refresh", response_model=Token)
async def refresh_token(
    refresh_token: str,
    db: Annotated[AsyncSession, Depends(get_db)],
) -> Token:
    """Refresh access token using refresh token."""
    payload = decode_token(refresh_token)
    
    if payload.type != "refresh":
        raise HTTPException(status_code=401, detail="Invalid token type")
    
    user = await db.get(User, int(payload.sub))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found or inactive")
    
    return Token(
        access_token=create_access_token(
            str(user.id),
            roles=[r.name for r in user.roles],
            scopes=[],
        ),
        refresh_token=create_refresh_token(str(user.id)),
        expires_in=token_config.access_token_expire_minutes * 60,
    )

async def authenticate_user(
    db: AsyncSession,
    email: str,
    password: str,
) -> User | None:
    stmt = select(User).where(User.email == email).options(selectinload(User.roles))
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    
    if not user or not verify_password(password, user.hashed_password):
        return None
    
    return user

3. Dependencias de Autenticación

# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from typing import Annotated

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="auth/token",
    scopes={
        "users:read": "Read user information",
        "users:write": "Modify user information",
        "admin": "Admin access",
    },
)

async def get_current_user(
    security_scopes: SecurityScopes,
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
    """Dependency that validates token and returns current user."""
    # Build authenticate header
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    
    payload = decode_token(token)
    
    if payload.type != "access":
        raise credentials_exception
    
    # Validate scopes
    for scope in security_scopes.scopes:
        if scope not in payload.scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Missing required scope: {scope}",
                headers={"WWW-Authenticate": authenticate_value},
            )
    
    user = await db.get(User, int(payload.sub))
    if not user:
        raise credentials_exception
    
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    
    return user

async def get_current_active_user(
    user: Annotated[User, Depends(get_current_user)],
) -> User:
    """Alias for get_current_user (for clarity)."""
    return user

# Type aliases for cleaner endpoints
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]

Uso en Endpoints

from fastapi import Security

@app.get("/users/me")
async def get_me(user: CurrentUser) -> UserResponse:
    return user

@app.get("/users", dependencies=[Security(get_current_user, scopes=["users:read"])])
async def list_users(db: DBSession) -> list[UserResponse]:
    return await user_repo.list(db)

@app.put("/users/{user_id}", dependencies=[Security(get_current_user, scopes=["users:write"])])
async def update_user(user_id: int, data: UserUpdate, db: DBSession) -> UserResponse:
    return await user_repo.update(db, user_id, data)

4. RBAC (Role-Based Access Control)

Modelos de Base de Datos

# models/auth.py
from sqlalchemy import Table, Column, Integer, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

# Tabla de asociación user-role
user_roles = Table(
    "user_roles",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)

# Tabla de asociación role-permission
role_permissions = Table(
    "role_permissions",
    Base.metadata,
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)

class Permission(Base):
    __tablename__ = "permissions"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True)  # e.g., "users:read"
    description: Mapped[str | None] = mapped_column(String(255))
    
    roles: Mapped[list["Role"]] = relationship(
        secondary=role_permissions,
        back_populates="permissions",
    )

class Role(Base):
    __tablename__ = "roles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)  # e.g., "admin"
    description: Mapped[str | None] = mapped_column(String(255))
    
    permissions: Mapped[list[Permission]] = relationship(
        secondary=role_permissions,
        back_populates="roles",
        lazy="selectin",
    )
    users: Mapped[list["User"]] = relationship(
        secondary=user_roles,
        back_populates="roles",
    )

class User(Base):
    __tablename__ = "users"
    
    # ... otros campos ...
    
    roles: Mapped[list[Role]] = relationship(
        secondary=user_roles,
        back_populates="users",
        lazy="selectin",
    )
    
    @property
    def permissions(self) -> set[str]:
        """Flatten all permissions from all roles."""
        perms = set()
        for role in self.roles:
            for perm in role.permissions:
                perms.add(perm.name)
        return perms
    
    def has_permission(self, permission: str) -> bool:
        return permission in self.permissions
    
    def has_role(self, role_name: str) -> bool:
        return any(r.name == role_name for r in self.roles)

Dependencias de Autorización

# dependencies/permissions.py
from functools import wraps
from typing import Callable

class PermissionChecker:
    """Dependency factory for permission checking."""
    
    def __init__(self, required_permissions: list[str], require_all: bool = True):
        self.required_permissions = required_permissions
        self.require_all = require_all
    
    async def __call__(self, user: CurrentUser) -> User:
        user_perms = user.permissions
        
        if self.require_all:
            # User must have ALL permissions
            missing = set(self.required_permissions) - user_perms
            if missing:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Missing permissions: {', '.join(missing)}",
                )
        else:
            # User must have AT LEAST ONE permission
            if not user_perms.intersection(self.required_permissions):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="Insufficient permissions",
                )
        
        return user

class RoleChecker:
    """Dependency factory for role checking."""
    
    def __init__(self, required_roles: list[str], require_all: bool = False):
        self.required_roles = required_roles
        self.require_all = require_all
    
    async def __call__(self, user: CurrentUser) -> User:
        user_roles = {r.name for r in user.roles}
        
        if self.require_all:
            if not set(self.required_roles).issubset(user_roles):
                raise HTTPException(status_code=403, detail="Insufficient roles")
        else:
            if not user_roles.intersection(self.required_roles):
                raise HTTPException(status_code=403, detail="Insufficient roles")
        
        return user

# Convenience functions
def require_permissions(*permissions: str, require_all: bool = True):
    return Depends(PermissionChecker(list(permissions), require_all))

def require_roles(*roles: str, require_all: bool = False):
    return Depends(RoleChecker(list(roles), require_all))

Uso en Endpoints

@app.delete(
    "/users/{user_id}",
    dependencies=[require_permissions("users:delete")],
)
async def delete_user(user_id: int, db: DBSession):
    return await user_repo.delete(db, user_id)

@app.get(
    "/admin/stats",
    dependencies=[require_roles("admin", "superuser")],  # Any of these roles
)
async def admin_stats(db: DBSession):
    return await stats_service.get_admin_stats(db)

@app.post(
    "/admin/dangerous-action",
    dependencies=[require_roles("admin", "superuser", require_all=True)],  # Must have BOTH
)
async def dangerous_action():
    pass

5. Scopes Dinámicos por Recurso

Para permisos granulares tipo “user X can edit post Y”:

# dependencies/resource_permissions.py
from typing import Callable, Any

class ResourcePermissionChecker:
    """Check permissions on specific resources."""
    
    def __init__(
        self,
        action: str,
        resource_type: str,
        get_resource: Callable[..., Any],
    ):
        self.action = action
        self.resource_type = resource_type
        self.get_resource = get_resource
    
    async def __call__(
        self,
        user: CurrentUser,
        resource_id: int,
        db: DBSession,
    ) -> Any:
        resource = await self.get_resource(db, resource_id)
        
        if not resource:
            raise HTTPException(status_code=404, detail="Resource not found")
        
        # Check ownership
        if hasattr(resource, 'owner_id') and resource.owner_id == user.id:
            return resource
        
        # Check explicit permission
        permission = f"{self.resource_type}:{self.action}"
        if user.has_permission(permission):
            return resource
        
        # Check resource-specific permission in DB
        has_access = await check_resource_permission(
            db, user.id, self.resource_type, resource_id, self.action
        )
        
        if not has_access:
            raise HTTPException(status_code=403, detail="Access denied")
        
        return resource

# Usage
async def get_post(db: DBSession, post_id: int) -> Post | None:
    return await db.get(Post, post_id)

can_edit_post = ResourcePermissionChecker("edit", "post", get_post)

@app.put("/posts/{resource_id}")
async def update_post(
    post: Annotated[Post, Depends(can_edit_post)],
    data: PostUpdate,
    db: DBSession,
):
    # post is already fetched and permission-checked
    return await post_repo.update(db, post.id, data)

6. API Keys para Servicios

# dependencies/api_key.py
from fastapi import Security
from fastapi.security import APIKeyHeader, APIKeyQuery

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)

async def get_api_key(
    api_key_header: str | None = Security(api_key_header),
    api_key_query: str | None = Security(api_key_query),
    db: DBSession = Depends(get_db),
) -> APIKey:
    """Validate API key from header or query parameter."""
    api_key = api_key_header or api_key_query
    
    if not api_key:
        raise HTTPException(status_code=401, detail="API key required")
    
    # Hash the key for lookup (keys stored hashed)
    key_hash = hash_api_key(api_key)
    
    stmt = select(APIKey).where(
        APIKey.key_hash == key_hash,
        APIKey.is_active == True,
        or_(APIKey.expires_at.is_(None), APIKey.expires_at > datetime.utcnow()),
    )
    result = await db.execute(stmt)
    db_key = result.scalar_one_or_none()
    
    if not db_key:
        raise HTTPException(status_code=401, detail="Invalid API key")
    
    # Update last used
    db_key.last_used_at = datetime.utcnow()
    await db.flush()
    
    return db_key

# Combined auth: JWT OR API Key
async def get_current_user_or_api_key(
    token: str | None = Depends(oauth2_scheme_optional),
    api_key: APIKey | None = Depends(get_api_key_optional),
    db: DBSession = Depends(get_db),
) -> User | APIKey:
    if token:
        return await get_current_user(SecurityScopes(), token, db)
    if api_key:
        return api_key
    raise HTTPException(status_code=401, detail="Authentication required")

7. Rate Limiting por Usuario

# dependencies/rate_limit.py
from fastapi import Request
import redis.asyncio as redis

class UserRateLimiter:
    def __init__(
        self,
        redis_client: redis.Redis,
        requests_per_minute: int = 60,
    ):
        self.redis = redis_client
        self.rpm = requests_per_minute
    
    async def __call__(self, request: Request, user: CurrentUser) -> None:
        key = f"rate_limit:{user.id}:{request.url.path}"
        
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, 60)
        
        if current > self.rpm:
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded",
                headers={"Retry-After": "60"},
            )

# Different limits per role
class TieredRateLimiter:
    LIMITS = {
        "free": 60,
        "pro": 600,
        "enterprise": 6000,
    }
    
    async def __call__(self, request: Request, user: CurrentUser) -> None:
        tier = self._get_user_tier(user)
        limit = self.LIMITS.get(tier, 60)
        
        # ... rate limiting logic with tier-specific limit

@app.get("/api/data", dependencies=[Depends(UserRateLimiter(redis_client, 100))])
async def get_data():
    pass

8. Seguridad Adicional

Password Policies

from pydantic import field_validator
import re

class PasswordPolicy(BaseModel):
    password: str
    
    @field_validator('password')
    @classmethod
    def validate_password(cls, v: str) -> str:
        if len(v) < 12:
            raise ValueError('Password must be at least 12 characters')
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain lowercase letter')
        if not re.search(r'\d', v):
            raise ValueError('Password must contain digit')
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
            raise ValueError('Password must contain special character')
        return v

Brute Force Protection

class LoginAttemptTracker:
    MAX_ATTEMPTS = 5
    LOCKOUT_MINUTES = 15
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def check_and_increment(self, email: str) -> None:
        key = f"login_attempts:{email}"
        attempts = await self.redis.incr(key)
        
        if attempts == 1:
            await self.redis.expire(key, self.LOCKOUT_MINUTES * 60)
        
        if attempts > self.MAX_ATTEMPTS:
            ttl = await self.redis.ttl(key)
            raise HTTPException(
                status_code=429,
                detail=f"Too many login attempts. Try again in {ttl} seconds",
            )
    
    async def reset(self, email: str) -> None:
        await self.redis.delete(f"login_attempts:{email}")

9. OpenAPI Security Schemes

FastAPI genera automáticamente la documentación de seguridad:

app = FastAPI(
    title="My API",
    swagger_ui_init_oauth={
        "usePkceWithAuthorizationCodeGrant": True,
        "clientId": "swagger-ui",
    },
)

# El botón "Authorize" aparece automáticamente en /docs
# con los scopes definidos en OAuth2PasswordBearer

10. Tabla Comparativa

AspectoPassport.js/NestJSFastAPI
Auth strategyClasses + DecoratorsDependencies
JWT validationStrategy patternFunction
Guards@UseGuards()dependencies=[]
ScopesCustom implementationNative SecurityScopes
TestingMock guardsdependency_overrides
OpenAPISwagger pluginNative integration

Conclusión

FastAPI ofrece un sistema de seguridad más simple pero igualmente potente:

  1. Dependencies como auth — Composable y testeable
  2. Scopes nativos — OAuth2 compliant con SecurityScopes
  3. RBAC flexible — Checkers como dependencies
  4. OpenAPI integrado — Documentación automática

Pattern Senior: Separa autenticación (quién eres) de autorización (qué puedes hacer). Usa dependencies para auth y permission checkers separados para authz.

En el siguiente capítulo, implementaremos testing profesional con Pytest.