Files
ORCID2SWORD/backend/app/security/jwt.py
T
Mireya Cueto Garrido af1b8e9956 feat: enhance backend security and configuration
- Updated Dockerfile to improve security with a non-root user and added health checks.
- Modified docker-compose.yml to set containers as read-only, restrict ports to localhost, and implement health checks.
- Enhanced .env.example with additional environment variables for security and configuration.
- Improved FastAPI application with middleware for security headers, CORS, and body size limits.
- Refactored authentication flow in auth.py to include state validation and improved error handling.
- Added rate limiting to various endpoints to prevent abuse.
- Updated researcher and publication handling to ensure better validation and error management.
2026-05-08 11:19:52 +02:00

139 lines
4.4 KiB
Python

"""
Emisión y verificación de JWT.
Endurecimiento aplicado:
- Sin fallback de secreto débil: si la configuración no es válida, falla al arranque.
- `iss` y `aud` obligatorios.
- `nbf` (not-before) y `iat` validados.
- `typ=access` para evitar mezclar tipos de token.
- Algoritmo fijo (no se acepta "none" ni cambios por payload).
- Errores opacos: nunca se expone el motivo del fallo de verificación al cliente.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import uuid4
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.models import Researcher
from app.db.session import get_db
from app.utils.orcid_validator import is_valid_orcid
_bearer = HTTPBearer(auto_error=False)
def create_access_token(*, subject: str, extra: dict[str, Any] | None = None) -> str:
"""
Emite un access token firmado con HS256 (configurable).
`subject` debe ser el ORCID iD verificado del investigador.
"""
if not is_valid_orcid(subject):
raise ValueError("subject must be a valid ORCID iD")
now = datetime.now(timezone.utc)
payload: dict[str, Any] = {
"iss": settings.JWT_ISSUER,
"aud": settings.JWT_AUDIENCE,
"sub": subject,
"iat": int(now.timestamp()),
"nbf": int(now.timestamp()),
"exp": int((now + timedelta(minutes=settings.JWT_EXPIRES_MINUTES)).timestamp()),
"jti": uuid4().hex,
"typ": "access",
}
if extra:
for reserved in ("iss", "aud", "sub", "iat", "nbf", "exp", "jti", "typ"):
extra.pop(reserved, None)
payload.update(extra)
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def _decode_token(token: str) -> dict[str, Any]:
try:
return jwt.decode(
token,
settings.JWT_SECRET,
algorithms=[settings.JWT_ALGORITHM],
audience=settings.JWT_AUDIENCE,
issuer=settings.JWT_ISSUER,
options={
"require_iat": True,
"require_nbf": True,
"require_exp": True,
"require_aud": True,
"require_iss": True,
},
)
except JWTError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
) from exc
def get_current_researcher(
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: Session = Depends(get_db),
) -> Researcher:
if not creds or not creds.credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
payload = _decode_token(creds.credentials)
if payload.get("typ") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"},
)
orcid_id = payload.get("sub")
if not isinstance(orcid_id, str) or not is_valid_orcid(orcid_id):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token subject",
headers={"WWW-Authenticate": "Bearer"},
)
researcher = db.query(Researcher).filter(Researcher.orcid_id == orcid_id).first()
if not researcher or not researcher.authenticated:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Researcher not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
request.state.researcher = researcher
return researcher
def get_optional_current_researcher(
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: Session = Depends(get_db),
) -> Researcher | None:
"""
Devuelve el investigador autenticado si hay Bearer válido.
Si no hay Bearer, devuelve None.
Si hay Bearer inválido, lanza 401 (no se acepta como anónimo).
"""
if not creds or not creds.credentials:
return None
return get_current_researcher(request=request, creds=creds, db=db)