Módulo 10 • 20 min de lectura
10 - Autenticación y Autorización
OAuth2, JWT, RBAC, scopes dinámicos y patrones de seguridad enterprise en FastAPI.
#security
#oauth2
#jwt
#rbac
#authentication
#authorization
1. De Passport.js a FastAPI Security
JavaScript/TypeScript
// NestJS + Passport
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
constructor() {
super({
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
secretOrKey: process.env.JWT_SECRET,
});
}
async validate(payload: JwtPayload) {
return { userId: payload.sub, roles: payload.roles };
}
}
@UseGuards(JwtAuthGuard)
@Controller('users')
export class UsersController {} Python
# FastAPI: Security como dependencia
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
payload = decode_token(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401)
return user
@app.get("/users/me")
async def get_me(user: Annotated[User, Depends(get_current_user)]):
return user Diferencia clave: FastAPI usa el sistema de dependencias nativo. No hay “guards” separados—la autenticación es una dependencia más que se puede componer y testear.
2. OAuth2 Password Flow Completo
Modelos y Schemas
# schemas/auth.py
from pydantic import BaseModel, EmailStr
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
class TokenPayload(BaseModel):
sub: str # Subject (user ID)
exp: int # Expiration timestamp
iat: int # Issued at
type: str # "access" or "refresh"
roles: list[str] = []
scopes: list[str] = []
class LoginRequest(BaseModel):
email: EmailStr
password: str
class RegisterRequest(BaseModel):
email: EmailStr
password: str
full_name: str
Configuración de Seguridad
# core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from app.core.config import settings
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
class TokenConfig(BaseModel):
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
token_config = TokenConfig(
secret_key=settings.SECRET_KEY,
access_token_expire_minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES,
refresh_token_expire_days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_token(
subject: str,
token_type: str,
expires_delta: timedelta,
extra_claims: dict[str, Any] | None = None,
) -> str:
now = datetime.now(timezone.utc)
expire = now + expires_delta
payload = {
"sub": subject,
"exp": expire,
"iat": now,
"type": token_type,
**(extra_claims or {}),
}
return jwt.encode(payload, token_config.secret_key, algorithm=token_config.algorithm)
def create_access_token(user_id: str, roles: list[str], scopes: list[str]) -> str:
return create_token(
subject=user_id,
token_type="access",
expires_delta=timedelta(minutes=token_config.access_token_expire_minutes),
extra_claims={"roles": roles, "scopes": scopes},
)
def create_refresh_token(user_id: str) -> str:
return create_token(
subject=user_id,
token_type="refresh",
expires_delta=timedelta(days=token_config.refresh_token_expire_days),
)
def decode_token(token: str) -> TokenPayload:
try:
payload = jwt.decode(
token,
token_config.secret_key,
algorithms=[token_config.algorithm],
)
return TokenPayload(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
Endpoints de Autenticación
# api/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/token", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(get_db)],
) -> Token:
"""OAuth2 compatible token endpoint."""
user = await authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
return Token(
access_token=create_access_token(
str(user.id),
roles=[r.name for r in user.roles],
scopes=form_data.scopes,
),
refresh_token=create_refresh_token(str(user.id)),
expires_in=token_config.access_token_expire_minutes * 60,
)
@router.post("/refresh", response_model=Token)
async def refresh_token(
refresh_token: str,
db: Annotated[AsyncSession, Depends(get_db)],
) -> Token:
"""Refresh access token using refresh token."""
payload = decode_token(refresh_token)
if payload.type != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
user = await db.get(User, int(payload.sub))
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found or inactive")
return Token(
access_token=create_access_token(
str(user.id),
roles=[r.name for r in user.roles],
scopes=[],
),
refresh_token=create_refresh_token(str(user.id)),
expires_in=token_config.access_token_expire_minutes * 60,
)
async def authenticate_user(
db: AsyncSession,
email: str,
password: str,
) -> User | None:
stmt = select(User).where(User.email == email).options(selectinload(User.roles))
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user or not verify_password(password, user.hashed_password):
return None
return user
3. Dependencias de Autenticación
# dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from typing import Annotated
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="auth/token",
scopes={
"users:read": "Read user information",
"users:write": "Modify user information",
"admin": "Admin access",
},
)
async def get_current_user(
security_scopes: SecurityScopes,
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Dependency that validates token and returns current user."""
# Build authenticate header
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
payload = decode_token(token)
if payload.type != "access":
raise credentials_exception
# Validate scopes
for scope in security_scopes.scopes:
if scope not in payload.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required scope: {scope}",
headers={"WWW-Authenticate": authenticate_value},
)
user = await db.get(User, int(payload.sub))
if not user:
raise credentials_exception
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
async def get_current_active_user(
user: Annotated[User, Depends(get_current_user)],
) -> User:
"""Alias for get_current_user (for clarity)."""
return user
# Type aliases for cleaner endpoints
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
Uso en Endpoints
from fastapi import Security
@app.get("/users/me")
async def get_me(user: CurrentUser) -> UserResponse:
return user
@app.get("/users", dependencies=[Security(get_current_user, scopes=["users:read"])])
async def list_users(db: DBSession) -> list[UserResponse]:
return await user_repo.list(db)
@app.put("/users/{user_id}", dependencies=[Security(get_current_user, scopes=["users:write"])])
async def update_user(user_id: int, data: UserUpdate, db: DBSession) -> UserResponse:
return await user_repo.update(db, user_id, data)
4. RBAC (Role-Based Access Control)
Modelos de Base de Datos
# models/auth.py
from sqlalchemy import Table, Column, Integer, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
# Tabla de asociación user-role
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
)
# Tabla de asociación role-permission
role_permissions = Table(
"role_permissions",
Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)
class Permission(Base):
__tablename__ = "permissions"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True) # e.g., "users:read"
description: Mapped[str | None] = mapped_column(String(255))
roles: Mapped[list["Role"]] = relationship(
secondary=role_permissions,
back_populates="permissions",
)
class Role(Base):
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True) # e.g., "admin"
description: Mapped[str | None] = mapped_column(String(255))
permissions: Mapped[list[Permission]] = relationship(
secondary=role_permissions,
back_populates="roles",
lazy="selectin",
)
users: Mapped[list["User"]] = relationship(
secondary=user_roles,
back_populates="roles",
)
class User(Base):
__tablename__ = "users"
# ... otros campos ...
roles: Mapped[list[Role]] = relationship(
secondary=user_roles,
back_populates="users",
lazy="selectin",
)
@property
def permissions(self) -> set[str]:
"""Flatten all permissions from all roles."""
perms = set()
for role in self.roles:
for perm in role.permissions:
perms.add(perm.name)
return perms
def has_permission(self, permission: str) -> bool:
return permission in self.permissions
def has_role(self, role_name: str) -> bool:
return any(r.name == role_name for r in self.roles)
Dependencias de Autorización
# dependencies/permissions.py
from functools import wraps
from typing import Callable
class PermissionChecker:
"""Dependency factory for permission checking."""
def __init__(self, required_permissions: list[str], require_all: bool = True):
self.required_permissions = required_permissions
self.require_all = require_all
async def __call__(self, user: CurrentUser) -> User:
user_perms = user.permissions
if self.require_all:
# User must have ALL permissions
missing = set(self.required_permissions) - user_perms
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permissions: {', '.join(missing)}",
)
else:
# User must have AT LEAST ONE permission
if not user_perms.intersection(self.required_permissions):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return user
class RoleChecker:
"""Dependency factory for role checking."""
def __init__(self, required_roles: list[str], require_all: bool = False):
self.required_roles = required_roles
self.require_all = require_all
async def __call__(self, user: CurrentUser) -> User:
user_roles = {r.name for r in user.roles}
if self.require_all:
if not set(self.required_roles).issubset(user_roles):
raise HTTPException(status_code=403, detail="Insufficient roles")
else:
if not user_roles.intersection(self.required_roles):
raise HTTPException(status_code=403, detail="Insufficient roles")
return user
# Convenience functions
def require_permissions(*permissions: str, require_all: bool = True):
return Depends(PermissionChecker(list(permissions), require_all))
def require_roles(*roles: str, require_all: bool = False):
return Depends(RoleChecker(list(roles), require_all))
Uso en Endpoints
@app.delete(
"/users/{user_id}",
dependencies=[require_permissions("users:delete")],
)
async def delete_user(user_id: int, db: DBSession):
return await user_repo.delete(db, user_id)
@app.get(
"/admin/stats",
dependencies=[require_roles("admin", "superuser")], # Any of these roles
)
async def admin_stats(db: DBSession):
return await stats_service.get_admin_stats(db)
@app.post(
"/admin/dangerous-action",
dependencies=[require_roles("admin", "superuser", require_all=True)], # Must have BOTH
)
async def dangerous_action():
pass
5. Scopes Dinámicos por Recurso
Para permisos granulares tipo “user X can edit post Y”:
# dependencies/resource_permissions.py
from typing import Callable, Any
class ResourcePermissionChecker:
"""Check permissions on specific resources."""
def __init__(
self,
action: str,
resource_type: str,
get_resource: Callable[..., Any],
):
self.action = action
self.resource_type = resource_type
self.get_resource = get_resource
async def __call__(
self,
user: CurrentUser,
resource_id: int,
db: DBSession,
) -> Any:
resource = await self.get_resource(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
# Check ownership
if hasattr(resource, 'owner_id') and resource.owner_id == user.id:
return resource
# Check explicit permission
permission = f"{self.resource_type}:{self.action}"
if user.has_permission(permission):
return resource
# Check resource-specific permission in DB
has_access = await check_resource_permission(
db, user.id, self.resource_type, resource_id, self.action
)
if not has_access:
raise HTTPException(status_code=403, detail="Access denied")
return resource
# Usage
async def get_post(db: DBSession, post_id: int) -> Post | None:
return await db.get(Post, post_id)
can_edit_post = ResourcePermissionChecker("edit", "post", get_post)
@app.put("/posts/{resource_id}")
async def update_post(
post: Annotated[Post, Depends(can_edit_post)],
data: PostUpdate,
db: DBSession,
):
# post is already fetched and permission-checked
return await post_repo.update(db, post.id, data)
6. API Keys para Servicios
# dependencies/api_key.py
from fastapi import Security
from fastapi.security import APIKeyHeader, APIKeyQuery
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
async def get_api_key(
api_key_header: str | None = Security(api_key_header),
api_key_query: str | None = Security(api_key_query),
db: DBSession = Depends(get_db),
) -> APIKey:
"""Validate API key from header or query parameter."""
api_key = api_key_header or api_key_query
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Hash the key for lookup (keys stored hashed)
key_hash = hash_api_key(api_key)
stmt = select(APIKey).where(
APIKey.key_hash == key_hash,
APIKey.is_active == True,
or_(APIKey.expires_at.is_(None), APIKey.expires_at > datetime.utcnow()),
)
result = await db.execute(stmt)
db_key = result.scalar_one_or_none()
if not db_key:
raise HTTPException(status_code=401, detail="Invalid API key")
# Update last used
db_key.last_used_at = datetime.utcnow()
await db.flush()
return db_key
# Combined auth: JWT OR API Key
async def get_current_user_or_api_key(
token: str | None = Depends(oauth2_scheme_optional),
api_key: APIKey | None = Depends(get_api_key_optional),
db: DBSession = Depends(get_db),
) -> User | APIKey:
if token:
return await get_current_user(SecurityScopes(), token, db)
if api_key:
return api_key
raise HTTPException(status_code=401, detail="Authentication required")
7. Rate Limiting por Usuario
# dependencies/rate_limit.py
from fastapi import Request
import redis.asyncio as redis
class UserRateLimiter:
def __init__(
self,
redis_client: redis.Redis,
requests_per_minute: int = 60,
):
self.redis = redis_client
self.rpm = requests_per_minute
async def __call__(self, request: Request, user: CurrentUser) -> None:
key = f"rate_limit:{user.id}:{request.url.path}"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, 60)
if current > self.rpm:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": "60"},
)
# Different limits per role
class TieredRateLimiter:
LIMITS = {
"free": 60,
"pro": 600,
"enterprise": 6000,
}
async def __call__(self, request: Request, user: CurrentUser) -> None:
tier = self._get_user_tier(user)
limit = self.LIMITS.get(tier, 60)
# ... rate limiting logic with tier-specific limit
@app.get("/api/data", dependencies=[Depends(UserRateLimiter(redis_client, 100))])
async def get_data():
pass
8. Seguridad Adicional
Password Policies
from pydantic import field_validator
import re
class PasswordPolicy(BaseModel):
password: str
@field_validator('password')
@classmethod
def validate_password(cls, v: str) -> str:
if len(v) < 12:
raise ValueError('Password must be at least 12 characters')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')
return v
Brute Force Protection
class LoginAttemptTracker:
MAX_ATTEMPTS = 5
LOCKOUT_MINUTES = 15
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def check_and_increment(self, email: str) -> None:
key = f"login_attempts:{email}"
attempts = await self.redis.incr(key)
if attempts == 1:
await self.redis.expire(key, self.LOCKOUT_MINUTES * 60)
if attempts > self.MAX_ATTEMPTS:
ttl = await self.redis.ttl(key)
raise HTTPException(
status_code=429,
detail=f"Too many login attempts. Try again in {ttl} seconds",
)
async def reset(self, email: str) -> None:
await self.redis.delete(f"login_attempts:{email}")
9. OpenAPI Security Schemes
FastAPI genera automáticamente la documentación de seguridad:
app = FastAPI(
title="My API",
swagger_ui_init_oauth={
"usePkceWithAuthorizationCodeGrant": True,
"clientId": "swagger-ui",
},
)
# El botón "Authorize" aparece automáticamente en /docs
# con los scopes definidos en OAuth2PasswordBearer
10. Tabla Comparativa
| Aspecto | Passport.js/NestJS | FastAPI |
|---|---|---|
| Auth strategy | Classes + Decorators | Dependencies |
| JWT validation | Strategy pattern | Function |
| Guards | @UseGuards() | dependencies=[] |
| Scopes | Custom implementation | Native SecurityScopes |
| Testing | Mock guards | dependency_overrides |
| OpenAPI | Swagger plugin | Native integration |
Conclusión
FastAPI ofrece un sistema de seguridad más simple pero igualmente potente:
- Dependencies como auth — Composable y testeable
- Scopes nativos — OAuth2 compliant con
SecurityScopes - RBAC flexible — Checkers como dependencies
- OpenAPI integrado — Documentación automática
Pattern Senior: Separa autenticación (quién eres) de autorización (qué puedes hacer). Usa dependencies para auth y permission checkers separados para authz.
En el siguiente capítulo, implementaremos testing profesional con Pytest.