feat: implement researcher and publication management with sync functionality

This commit is contained in:
Mireya Cueto Garrido
2026-04-21 13:59:41 +02:00
parent 7717e2a5b2
commit a286c2e3ae
13 changed files with 698 additions and 23 deletions
+74
View File
@@ -0,0 +1,74 @@
class PublicationNormalizer:
@staticmethod
def safe_get_title(summary):
t = summary.get("title")
if t is None:
return None
# Caso 1: {"title": {"value": "..."}}
if isinstance(t, dict) and "title" in t and isinstance(t["title"], dict):
return t["title"].get("value")
# Caso 2: {"title": {"title": "..."}} (muy común en /works)
if isinstance(t, dict) and "title" in t and isinstance(t["title"], str):
return t["title"]
# Caso 3: {"title": "string"}
if isinstance(t, str):
return t
# Caso 4: {"value": "..."}
if isinstance(t, dict) and "value" in t:
return t["value"]
return None
@staticmethod
def normalize_work(summary: dict) -> dict:
title = PublicationNormalizer.safe_get_title(summary)
# Journal title
journal_raw = summary.get("journal-title")
if isinstance(journal_raw, dict):
journal = journal_raw.get("value") or journal_raw.get("title")
else:
journal = journal_raw
# DOI
doi = None
ext_ids = summary.get("external-ids", {}).get("external-id", [])
for ext in ext_ids:
if ext.get("external-id-type") == "doi":
doi = ext.get("external-id-value")
break
# Publication year
pub_year = (
summary.get("publication-date", {})
.get("year", {})
.get("value")
)
# Type
work_type = summary.get("type")
# put-code
put_code = summary.get("put-code")
# Fingerprint
fingerprint = f"{title}-{doi}-{pub_year}-{work_type}"
if fingerprint:
fingerprint = fingerprint.lower().replace(" ", "")
return {
"put_code": put_code,
"title": title or "Untitled",
"journal": journal,
"doi": doi,
"pub_year": pub_year,
"type": work_type,
"hash_fingerprint": fingerprint
}
+5 -10
View File
@@ -2,18 +2,13 @@ import httpx
import os
from typing import Optional
class ORCIDClient:
"""
Cliente para interactuar con la Public API de ORCID.
Permite:
- Obtener token público
- Consultar /record
- Consultar /works
"""
TOKEN_URL = "https://sandbox.orcid.org/oauth/token"
BASE_URL = "https://pub.sandbox.orcid.org/v3.0"
TOKEN_URL = "https://orcid.org/oauth/token"
BASE_URL = "https://pub.orcid.org/v3.0"
# TOKEN_URL = "https://orcid.org/oauth/token"
# BASE_URL = "https://pub.orcid.org/v3.0"
def __init__(self):
self.client_id = os.getenv("ORCID_CLIENT_ID")
+155
View File
@@ -0,0 +1,155 @@
from datetime import datetime
from xml.etree.ElementTree import Element, SubElement, tostring
from io import BytesIO
import zipfile
import json
class SWORDExporter:
ATOM_NS = "http://www.w3.org/2005/Atom"
DC_NS = "http://purl.org/dc/elements/1.1/"
# ---------------------------------------------------------
# 1) XML PRINCIPAL (sword.xml)
# ---------------------------------------------------------
@staticmethod
def export_feed_xml(researcher, publications) -> bytes:
feed = Element("feed", xmlns=SWORDExporter.ATOM_NS)
title = SubElement(feed, "title")
title.text = f"Publications for {researcher.orcid_id}"
author = SubElement(feed, "author")
name = SubElement(author, "name")
name.text = researcher.name or "Unknown"
updated = SubElement(feed, "updated")
updated.text = datetime.utcnow().isoformat() + "Z"
feed_id = SubElement(feed, "id")
feed_id.text = f"urn:uuid:{researcher.id}"
for pub in publications:
entry = SubElement(feed, "entry")
entry_id = SubElement(entry, "id")
entry_id.text = f"urn:uuid:{pub.id}"
entry_updated = SubElement(entry, "updated")
entry_updated.text = datetime.utcnow().isoformat() + "Z"
dc_title = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}title")
dc_title.text = pub.title
if pub.doi:
dc_identifier = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}identifier")
dc_identifier.text = f"doi:{pub.doi}"
if pub.pub_year:
dc_date = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}date")
dc_date.text = str(pub.pub_year)
if pub.type:
dc_type = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}type")
dc_type.text = pub.type
if pub.journal:
dc_source = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}source")
dc_source.text = pub.journal
xml_bytes = tostring(feed, encoding="utf-8", xml_declaration=True)
return xml_bytes
# ---------------------------------------------------------
# 2) manifest.txt
# ---------------------------------------------------------
@staticmethod
def generate_manifest(researcher, publications) -> str:
lines = [
"SWORD Deposit Package",
"----------------------",
f"Researcher ORCID: {researcher.orcid_id}",
f"Researcher Name: {researcher.name or 'Unknown'}",
f"Total Publications: {len(publications)}",
f"Generated At: {datetime.utcnow().isoformat()}Z",
"",
"Publications:",
]
for pub in publications:
lines.append(f"- {pub.title} ({pub.pub_year}) DOI={pub.doi}")
return "\n".join(lines)
# ---------------------------------------------------------
# 3) metadata.json
# ---------------------------------------------------------
@staticmethod
def generate_metadata_json(researcher, publications) -> str:
data = {
"researcher": {
"orcid_id": researcher.orcid_id,
"name": researcher.name,
"id": str(researcher.id),
},
"generated_at": datetime.utcnow().isoformat() + "Z",
"publications": [
{
"id": str(pub.id),
"title": pub.title,
"doi": pub.doi,
"year": pub.pub_year,
"type": pub.type,
"journal": pub.journal,
}
for pub in publications
],
}
return json.dumps(data, indent=4)
# ---------------------------------------------------------
# 4) mets.xml (versión simple)
# ---------------------------------------------------------
@staticmethod
def generate_mets_xml(researcher, publications) -> bytes:
mets = Element("mets", xmlns="http://www.loc.gov/METS/")
header = SubElement(mets, "metsHdr")
agent = SubElement(header, "agent", ROLE="CREATOR", TYPE="OTHER")
name = SubElement(agent, "name")
name.text = "ORCID Exporter System"
dmd_sec = SubElement(mets, "dmdSec", ID="dmd1")
md_wrap = SubElement(dmd_sec, "mdWrap", MDTYPE="DC")
xml_data = SubElement(md_wrap, "xmlData")
for pub in publications:
dc_title = SubElement(xml_data, f"{{{SWORDExporter.DC_NS}}}title")
dc_title.text = pub.title
if pub.doi:
dc_id = SubElement(xml_data, f"{{{SWORDExporter.DC_NS}}}identifier")
dc_id.text = f"doi:{pub.doi}"
return tostring(mets, encoding="utf-8", xml_declaration=True)
# ---------------------------------------------------------
# 5) ZIP FINAL
# ---------------------------------------------------------
@staticmethod
def export_zip(researcher, publications) -> bytes:
xml_bytes = SWORDExporter.export_feed_xml(researcher, publications)
manifest = SWORDExporter.generate_manifest(researcher, publications)
metadata_json = SWORDExporter.generate_metadata_json(researcher, publications)
mets_xml = SWORDExporter.generate_mets_xml(researcher, publications)
mem_file = BytesIO()
with zipfile.ZipFile(mem_file, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("sword.xml", xml_bytes)
zf.writestr("manifest.txt", manifest)
zf.writestr("metadata.json", metadata_json)
zf.writestr("mets.xml", mets_xml)
mem_file.seek(0)
return mem_file.read()
+96
View File
@@ -0,0 +1,96 @@
from sqlalchemy.orm import Session
from app.services.orcid_client import ORCIDClient
from app.services.normalizer import PublicationNormalizer
from app.repositories.researcher_repository import ResearcherRepository
from app.repositories.publication_repository import PublicationRepository
from app.repositories.syncjob_repository import SyncJobRepository
import httpx
class SyncService:
def __init__(self):
self.orcid_client = ORCIDClient()
def sync_researcher(self, db: Session, orcid_id: str):
"""
Sincroniza las publicaciones de un investigador con manejo robusto de errores.
"""
# 1. Obtener o crear investigador
try:
researcher = ResearcherRepository.get_by_orcid(db, orcid_id)
if not researcher:
record = self.orcid_client.fetch_record(orcid_id)
name = (
record.get("person", {})
.get("name", {})
.get("given-names", {})
.get("value")
)
researcher = ResearcherRepository.create(db, orcid_id, name)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {
"status": "error",
"message": f"El ORCID {orcid_id} no existe en Sandbox."
}
return {"status": "error", "message": str(e)}
# 2. Crear SyncJob
job = SyncJobRepository.start_job(db, researcher.id)
# 3. Obtener works
try:
works_raw = self.orcid_client.fetch_works(orcid_id)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
SyncJobRepository.finish_job(db, job, 0, 0)
ResearcherRepository.update_last_sync(db, researcher)
return {
"status": "ok",
"message": "El ORCID existe pero no tiene publicaciones públicas.",
"new_records": 0,
"updated_records": 0,
"total": 0
}
return {"status": "error", "message": str(e)}
groups = works_raw.get("group", [])
new_records = 0
updated_records = 0
# 4. Procesar works
for group in groups:
summary = group["work-summary"][0]
normalized = PublicationNormalizer.normalize_work(summary)
# 🔥 AHORA SE DETECTAN DUPLICADOS POR put_code
existing = PublicationRepository.get_by_put_code(
db, researcher.id, normalized["put_code"]
)
if existing:
PublicationRepository.update(db, existing, normalized)
updated_records += 1
else:
PublicationRepository.create(db, researcher.id, normalized)
new_records += 1
# 5. Finalizar SyncJob
SyncJobRepository.finish_job(db, job, new_records, updated_records)
# 6. Actualizar last_sync_at
ResearcherRepository.update_last_sync(db, researcher)
return {
"status": "ok",
"message": "Sincronización completada correctamente.",
"researcher": researcher.orcid_id,
"new_records": new_records,
"updated_records": updated_records,
"total": new_records + updated_records
}