feat: enhance backend security and configuration

- Updated Dockerfile to improve security with a non-root user and added health checks.
- Modified docker-compose.yml to set containers as read-only, restrict ports to localhost, and implement health checks.
- Enhanced .env.example with additional environment variables for security and configuration.
- Improved FastAPI application with middleware for security headers, CORS, and body size limits.
- Refactored authentication flow in auth.py to include state validation and improved error handling.
- Added rate limiting to various endpoints to prevent abuse.
- Updated researcher and publication handling to ensure better validation and error management.
This commit is contained in:
Mireya Cueto Garrido
2026-05-08 11:19:52 +02:00
parent 96e58dbd16
commit af1b8e9956
37 changed files with 1375 additions and 282 deletions
+34 -25
View File
@@ -1,43 +1,52 @@
import os
from dotenv import load_dotenv
"""
Autenticación por API key (uso máquina-a-máquina, p. ej. el scheduler interno).
Endurecimiento:
- Comparación constante en tiempo (`hmac.compare_digest`) para evitar timing attacks.
- No se loggea el valor de la cabecera bajo ninguna circunstancia.
- Se separa este mecanismo del JWT de usuario; la API key NO debe usarse como
prueba de identidad de un investigador.
"""
from __future__ import annotations
import hmac
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
# Cargar variables del .env
load_dotenv()
API_KEY_NAME = os.getenv("API_KEY_NAME")
API_KEY_VALUE = os.getenv("API_KEY_VALUE")
if not API_KEY_NAME:
raise RuntimeError("ERROR: La variable API_KEY_NAME no está definida en el .env")
if not API_KEY_VALUE:
raise RuntimeError("ERROR: La variable API_KEY_VALUE no está definida en el .env")
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
from app.core.config import settings
def get_api_key(api_key: str = Depends(api_key_header)):
if api_key != API_KEY_VALUE:
api_key_header = APIKeyHeader(name=settings.API_KEY_NAME, auto_error=False)
def _is_valid_key(provided: str | None) -> bool:
if not provided or not settings.API_KEY_VALUE:
return False
return hmac.compare_digest(provided.encode("utf-8"), settings.API_KEY_VALUE.encode("utf-8"))
def get_api_key(api_key: str | None = Depends(api_key_header)) -> str:
if not _is_valid_key(api_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key inválida o ausente."
detail="Invalid or missing API key",
)
return api_key
return api_key # type: ignore[return-value]
def get_api_key_optional(api_key: str = Depends(api_key_header)) -> str | None:
def get_api_key_optional(api_key: str | None = Depends(api_key_header)) -> str | None:
"""
Devuelve la API key si está presente y es correcta.
- Si no está presente: None
- Si está presente pero incorrecta: 401
- Si no llega cabecera: None.
- Si llega y es válida: la devuelve.
- Si llega pero es inválida: 401.
"""
if api_key is None:
return None
if api_key != API_KEY_VALUE:
if not _is_valid_key(api_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key inválida."
detail="Invalid API key",
)
return api_key
+94 -31
View File
@@ -1,75 +1,138 @@
import os
"""
Emisión y verificación de JWT.
Endurecimiento aplicado:
- Sin fallback de secreto débil: si la configuración no es válida, falla al arranque.
- `iss` y `aud` obligatorios.
- `nbf` (not-before) y `iat` validados.
- `typ=access` para evitar mezclar tipos de token.
- Algoritmo fijo (no se acepta "none" ni cambios por payload).
- Errores opacos: nunca se expone el motivo del fallo de verificación al cliente.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import uuid4
from fastapi import Depends, HTTPException, status
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from dotenv import load_dotenv
from app.core.config import settings
from app.db.models import Researcher
from app.db.session import get_db
load_dotenv()
from app.utils.orcid_validator import is_valid_orcid
_bearer = HTTPBearer(auto_error=False)
def _settings() -> tuple[str, str, int]:
# Fallback de desarrollo para evitar 500 por configuración ausente.
secret = os.getenv("JWT_SECRET") or "change_me"
algorithm = os.getenv("JWT_ALGORITHM") or "HS256"
expires_minutes = int(os.getenv("JWT_EXPIRES_MINUTES") or "720")
return secret, algorithm, expires_minutes
def create_access_token(*, subject: str, extra: dict[str, Any] | None = None) -> str:
secret, algorithm, expires_minutes = _settings()
"""
Emite un access token firmado con HS256 (configurable).
`subject` debe ser el ORCID iD verificado del investigador.
"""
if not is_valid_orcid(subject):
raise ValueError("subject must be a valid ORCID iD")
now = datetime.now(timezone.utc)
payload: dict[str, Any] = {
"iss": settings.JWT_ISSUER,
"aud": settings.JWT_AUDIENCE,
"sub": subject,
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=expires_minutes)).timestamp()),
"nbf": int(now.timestamp()),
"exp": int((now + timedelta(minutes=settings.JWT_EXPIRES_MINUTES)).timestamp()),
"jti": uuid4().hex,
"typ": "access",
}
if extra:
for reserved in ("iss", "aud", "sub", "iat", "nbf", "exp", "jti", "typ"):
extra.pop(reserved, None)
payload.update(extra)
return jwt.encode(payload, secret, algorithm=algorithm)
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def _decode_token(token: str) -> dict[str, Any]:
try:
return jwt.decode(
token,
settings.JWT_SECRET,
algorithms=[settings.JWT_ALGORITHM],
audience=settings.JWT_AUDIENCE,
issuer=settings.JWT_ISSUER,
options={
"require_iat": True,
"require_nbf": True,
"require_exp": True,
"require_aud": True,
"require_iss": True,
},
)
except JWTError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
) from exc
def get_current_researcher(
creds: HTTPAuthorizationCredentials = Depends(_bearer),
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: Session = Depends(get_db),
) -> Researcher:
if not creds or not creds.credentials:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
secret, algorithm, _ = _settings()
try:
payload = jwt.decode(creds.credentials, secret, algorithms=[algorithm])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
payload = _decode_token(creds.credentials)
if payload.get("typ") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"},
)
orcid_id = payload.get("sub")
if not isinstance(orcid_id, str) or not orcid_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token subject")
if not isinstance(orcid_id, str) or not is_valid_orcid(orcid_id):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token subject",
headers={"WWW-Authenticate": "Bearer"},
)
researcher = db.query(Researcher).filter(Researcher.orcid_id == orcid_id).first()
if not researcher or not researcher.authenticated:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Researcher not authenticated")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Researcher not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
request.state.researcher = researcher
return researcher
def get_optional_current_researcher(
creds: HTTPAuthorizationCredentials = Depends(_bearer),
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: Session = Depends(get_db),
) -> Researcher | None:
"""
Devuelve el investigador autenticado si hay Bearer token.
Si no hay token, devuelve None.
Si hay token inválido, lanza 401.
Devuelve el investigador autenticado si hay Bearer válido.
Si no hay Bearer, devuelve None.
Si hay Bearer inválido, lanza 401 (no se acepta como anónimo).
"""
if not creds or not creds.credentials:
return None
return get_current_researcher(creds=creds, db=db)
return get_current_researcher(request=request, creds=creds, db=db)
+76
View File
@@ -0,0 +1,76 @@
"""
OAuth state anti-CSRF para el flujo de login con ORCID.
El parámetro `state` se genera en `/auth/orcid/authorize`, se guarda en una
cookie HttpOnly + SameSite=Lax con TTL corto, y se valida en el callback.
Si el `state` falta, no coincide o ha expirado, el login se rechaza.
"""
from __future__ import annotations
import hmac
import secrets
from datetime import datetime, timezone
from fastapi import HTTPException, status
from starlette.requests import Request
from starlette.responses import Response
from app.core.config import settings
_STATE_BYTES = 32
def generate_state() -> str:
return secrets.token_urlsafe(_STATE_BYTES)
def attach_state_cookie(response: Response, state: str) -> None:
"""
Persiste el `state` en una cookie segura y devuelve el valor crudo.
"""
response.set_cookie(
key=settings.ORCID_OAUTH_STATE_COOKIE,
value=state,
max_age=settings.ORCID_OAUTH_STATE_TTL_SECONDS,
secure=settings.is_production,
httponly=True,
samesite="lax",
path="/",
)
def clear_state_cookie(response: Response) -> None:
response.delete_cookie(
key=settings.ORCID_OAUTH_STATE_COOKIE,
path="/",
)
def validate_state(request: Request, received_state: str | None) -> None:
"""
Compara el state recibido en el callback con el almacenado en cookie.
Lanza 400 si no coincide o falta. Comparación en tiempo constante.
"""
if not settings.ORCID_OAUTH_STATE_ENABLED:
return
cookie_value = request.cookies.get(settings.ORCID_OAUTH_STATE_COOKIE)
if not cookie_value or not received_state:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="OAuth state missing",
)
if not hmac.compare_digest(cookie_value.encode("utf-8"), received_state.encode("utf-8")):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="OAuth state mismatch",
)
def now_ts() -> int:
return int(datetime.now(timezone.utc).timestamp())