refactor: update researcher API endpoints to include batch search and statistics functionality

- Removed deprecated search logic and replaced it with a new structure for handling researcher statistics.
- Introduced new schemas for batch search requests and responses.
- Enhanced the search endpoint to return publication statistics alongside researcher data.
- Updated docker-compose file to remove unnecessary versioning.
This commit is contained in:
Mireya Cueto Garrido
2026-04-28 09:41:45 +02:00
parent 60cb036f3e
commit c0eb0d3916
3 changed files with 116 additions and 26 deletions
+92 -21
View File
@@ -1,12 +1,19 @@
from datetime import datetime
from typing import List
import httpx
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.db.models import Publication, Researcher
from app.db.session import get_db
from app.schema.researcher import ResearcherWithPublicationsSchema
from app.schema.researcher import (
ResearcherBatchSearchRequestSchema,
ResearcherBatchSearchResponseSchema,
ResearcherSearchErrorSchema,
ResearcherStatsSchema,
ResearcherWithPublicationsSchema,
)
from app.services.normalizer import PublicationNormalizer
from app.services.orcid_client import get_works_summary, get_work_detail
@@ -32,24 +39,24 @@ def publication_changed(existing: Publication, data: dict) -> bool:
return False
# ---------------------------------------------------------
# ENDPOINT 1: SEARCH + SYNC (sin contadores)
# ---------------------------------------------------------
@router.get("/search/{orcid_id}", response_model=ResearcherWithPublicationsSchema)
def search_and_sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
# Buscar o crear Researcher
researcher = db.query(Researcher).filter(Researcher.orcid_id == orcid_id).first()
if not researcher:
researcher = Researcher(
orcid_id=orcid_id,
name=None,
authenticated=False,
last_sync_at=None,
)
db.add(researcher)
db.flush()
def build_researcher_stats(publications: List[Publication]) -> ResearcherStatsSchema:
publication_types: dict[str, int] = {}
# Obtener works summary desde ORCID
for publication in publications:
pub_type = publication.type or "unknown"
publication_types[pub_type] = publication_types.get(pub_type, 0) + 1
return ResearcherStatsSchema(
total_publications=len(publications),
publication_types=publication_types,
)
def _upsert_researcher_publications(
researcher: Researcher,
orcid_id: str,
db: Session,
) -> List[Publication]:
works = get_works_summary(orcid_id)
groups = works.get("group", [])
@@ -65,16 +72,13 @@ def search_and_sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
if put_code is None:
continue
# Obtener detalle del work
try:
detail = get_work_detail(orcid_id, put_code)
except Exception:
detail = None
# Normalizar datos
data = PublicationNormalizer.normalize(summary, detail)
# Ver si ya existe la publicación
existing = (
db.query(Publication)
.filter(
@@ -111,9 +115,28 @@ def search_and_sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
db.commit()
db.refresh(researcher)
return publications
def build_search_response(orcid_id: str, db: Session) -> ResearcherWithPublicationsSchema:
researcher = db.query(Researcher).filter(Researcher.orcid_id == orcid_id).first()
if not researcher:
researcher = Researcher(
orcid_id=orcid_id,
name=None,
authenticated=False,
last_sync_at=None,
)
db.add(researcher)
db.flush()
publications = _upsert_researcher_publications(researcher, orcid_id, db)
stats = build_researcher_stats(publications)
return ResearcherWithPublicationsSchema(
researcher=researcher,
publications=publications,
stats=stats,
new_records=0,
updated_records=0,
unchanged_records=0,
@@ -121,6 +144,53 @@ def search_and_sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
)
# ---------------------------------------------------------
# ENDPOINT 1: SEARCH + SYNC (sin contadores)
# ---------------------------------------------------------
@router.get("/search/{orcid_id}", response_model=ResearcherWithPublicationsSchema)
def search_and_sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
return build_search_response(orcid_id, db)
@router.post("/search", response_model=ResearcherBatchSearchResponseSchema)
def search_and_sync_researchers(
payload: ResearcherBatchSearchRequestSchema,
db: Session = Depends(get_db),
):
results: List[ResearcherWithPublicationsSchema] = []
errors: List[ResearcherSearchErrorSchema] = []
# Evita llamadas duplicadas a ORCID conservando el orden de entrada.
unique_orcid_ids = list(dict.fromkeys(payload.orcid_ids))
for orcid_id in unique_orcid_ids:
try:
results.append(build_search_response(orcid_id, db))
except httpx.HTTPStatusError as exc:
db.rollback()
errors.append(
ResearcherSearchErrorSchema(
orcid_id=orcid_id,
detail=f"ORCID devolvió {exc.response.status_code} para {orcid_id}.",
)
)
except Exception as exc:
db.rollback()
errors.append(
ResearcherSearchErrorSchema(
orcid_id=orcid_id,
detail=str(exc),
)
)
return ResearcherBatchSearchResponseSchema(
results=results,
errors=errors,
total_requested=len(unique_orcid_ids),
total_processed=len(results),
)
# ---------------------------------------------------------
# ENDPOINT 2: SYNC COMPLETO (con contadores + status)
# ---------------------------------------------------------
@@ -201,6 +271,7 @@ def sync_researcher(orcid_id: str, db: Session = Depends(get_db)):
return ResearcherWithPublicationsSchema(
researcher=researcher,
publications=publications_output,
stats=build_researcher_stats(publications_output),
new_records=new_count,
updated_records=updated_count,
unchanged_records=unchanged_count,
+24 -3
View File
@@ -1,6 +1,6 @@
from pydantic import BaseModel
from pydantic import BaseModel, Field
from uuid import UUID
from typing import Optional, List
from typing import Optional, List, Dict
from datetime import datetime
from app.schema.publication import PublicationSchema
@@ -14,14 +14,35 @@ class ResearcherSchema(BaseModel):
model_config = {"from_attributes": True}
class ResearcherStatsSchema(BaseModel):
total_publications: int
publication_types: Dict[str, int]
class ResearcherWithPublicationsSchema(BaseModel):
researcher: ResearcherSchema
publications: List[PublicationSchema]
stats: ResearcherStatsSchema
# NUEVOS CAMPOS
new_records: int
updated_records: int
unchanged_records: int
total_records: int
model_config = {"from_attributes": True}
class ResearcherBatchSearchRequestSchema(BaseModel):
orcid_ids: List[str] = Field(min_length=1)
class ResearcherSearchErrorSchema(BaseModel):
orcid_id: str
detail: str
class ResearcherBatchSearchResponseSchema(BaseModel):
results: List[ResearcherWithPublicationsSchema]
errors: List[ResearcherSearchErrorSchema]
total_requested: int
total_processed: int