Files
ORCID2SWORD/backend/app/api/auth.py
T
Mireya Cueto Garrido fec26089ed feat: enhance authentication and publication download tracking
- Added JWT authentication support with configurable secret and expiration.
- Introduced optional API key validation for endpoints.
- Implemented tracking of publication downloads by researchers, storing records in a new PublicationDownload model.
- Updated export endpoints to conditionally register downloads based on user authentication.
- Enhanced researcher search response to indicate if publications were downloaded by the current user.
- Updated environment configuration to include new JWT settings.
2026-04-29 10:27:17 +02:00

98 lines
3.6 KiB
Python

import httpx
import os
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from app.db.models import Researcher
from app.db.session import get_db
from app.schema.auth import OrcidLoginResponseSchema
from app.security.jwt import create_access_token
from app.services.orcid_client import ORCIDClient
from app.utils.orcid_validator import is_valid_orcid
router = APIRouter(prefix="/auth", tags=["auth"])
def _extract_display_name(record: dict) -> str | None:
person = (record or {}).get("person") or {}
name = person.get("name") or {}
given = ((name.get("given-names") or {}).get("value")) if isinstance(name.get("given-names"), dict) else None
family = ((name.get("family-name") or {}).get("value")) if isinstance(name.get("family-name"), dict) else None
full = " ".join([p for p in [given, family] if p])
return full or None
def _orcid_redirect_uri() -> str:
# Debe coincidir con el `redirect_uri` registrado en tu integración ORCID.
return os.getenv("ORCID_REDIRECT_URI") or "http://localhost:8000/api/auth/orcid/callback"
@router.get("/orcid/authorize")
def authorize_orcid():
"""
Inicia el flujo OAuth 3-legged (authorization code) hacia ORCID.
"""
client = ORCIDClient()
authorize_url = client.build_authorize_url(
redirect_uri=_orcid_redirect_uri(),
# Solo necesitamos el Authenticated iD del usuario.
scope="/authenticate",
)
return RedirectResponse(authorize_url)
@router.get("/orcid/callback", response_model=OrcidLoginResponseSchema)
def orcid_callback(code: str, db: Session = Depends(get_db)):
"""
Recibe el `code` devuelto por ORCID, lo intercambia por tokens en el servidor
y emite nuestro JWT solo para el ORCID autenticado por ORCID.
"""
if not code:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing ORCID authorization code")
client = ORCIDClient()
redirect_uri = _orcid_redirect_uri()
try:
token_data = client.exchange_authorization_code(code=code, redirect_uri=redirect_uri)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"ORCID token error ({exc.response.status_code})",
)
except httpx.TimeoutException:
raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail="ORCID timeout")
except Exception:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="ORCID unavailable")
orcid_id = (token_data.get("orcid") or "").strip()
if not is_valid_orcid(orcid_id):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid ORCID returned by OAuth")
display_name = token_data.get("name")
if not display_name:
# Fallback si ORCID no devuelve `name` en el token response.
try:
record = client.fetch_record(orcid_id)
display_name = _extract_display_name(record)
except Exception:
display_name = None
researcher = db.query(Researcher).filter(Researcher.orcid_id == orcid_id).first()
if not researcher:
researcher = Researcher(orcid_id=orcid_id, name=display_name, authenticated=True)
db.add(researcher)
else:
researcher.authenticated = True
if display_name and not researcher.name:
researcher.name = display_name
db.commit()
db.refresh(researcher)
token = create_access_token(subject=orcid_id, extra={"rid": str(researcher.id)})
return OrcidLoginResponseSchema(access_token=token)