Versión 3 Backend - Endpoints finales corregidos

This commit is contained in:
Mireya Cueto Garrido
2026-04-27 13:39:32 +02:00
parent a286c2e3ae
commit 96f01c0126
4343 changed files with 1046097 additions and 465 deletions
+89 -52
View File
@@ -1,74 +1,111 @@
from typing import List
def _get(d: dict | None, *keys, default=None):
cur = d or {}
for k in keys:
if not isinstance(cur, dict):
return default
cur = cur.get(k)
if cur is None:
return default
return cur
class PublicationNormalizer:
@staticmethod
def safe_get_title(summary):
t = summary.get("title")
def normalize(summary: dict, detail: dict | None = None) -> dict:
"""
summary: work-summary de ORCID
detail: work completo (puede ser None si la llamada falla)
"""
if t is None:
return None
# --- Core desde summary ---
put_code = summary.get("put-code")
# Caso 1: {"title": {"value": "..."}}
if isinstance(t, dict) and "title" in t and isinstance(t["title"], dict):
return t["title"].get("value")
title = _get(summary, "title", "title", "value")
type_ = summary.get("type")
# Caso 2: {"title": {"title": "..."}} (muy común en /works)
if isinstance(t, dict) and "title" in t and isinstance(t["title"], str):
return t["title"]
journal = _get(summary, "journal-title", "value")
# Caso 3: {"title": "string"}
if isinstance(t, str):
return t
year = _get(summary, "publication-date", "year", "value")
month = _get(summary, "publication-date", "month", "value")
day = _get(summary, "publication-date", "day", "value")
# Caso 4: {"value": "..."}
if isinstance(t, dict) and "value" in t:
return t["value"]
url = _get(summary, "url", "value")
short_description = summary.get("short-description")
return None
@staticmethod
def normalize_work(summary: dict) -> dict:
title = PublicationNormalizer.safe_get_title(summary)
# Journal title
journal_raw = summary.get("journal-title")
if isinstance(journal_raw, dict):
journal = journal_raw.get("value") or journal_raw.get("title")
else:
journal = journal_raw
# DOI
# DOI desde summary (external-ids)
doi = None
ext_ids = summary.get("external-ids", {}).get("external-id", [])
for ext in ext_ids:
external_ids_list: List[dict] = _get(
summary, "external-ids", "external-id", default=[]
) or []
for ext in external_ids_list:
if ext.get("external-id-type") == "doi":
doi = ext.get("external-id-value")
break
# Publication year
pub_year = (
summary.get("publication-date", {})
.get("year", {})
.get("value")
)
# --- Si tenemos detail, enriquecemos ---
subtitle = None
citation_type = None
citation_value = None
language_code = None
country = None
external_ids_full: List[dict] | None = None
contributors: List[dict] | None = None
# Type
work_type = summary.get("type")
if detail:
# Subtitle
subtitle = _get(detail, "title", "subtitle", "value") or subtitle
# put-code
put_code = summary.get("put-code")
# Citation
citation_type = _get(detail, "citation", "citation-type")
citation_value = _get(detail, "citation", "citation-value")
# Fingerprint
fingerprint = f"{title}-{doi}-{pub_year}-{work_type}"
if fingerprint:
fingerprint = fingerprint.lower().replace(" ", "")
# Language
language_code = detail.get("language-code")
# Country
country = _get(detail, "country", "value")
# External IDs completos
external_ids_full = _get(
detail, "external-ids", "external-id", default=[]
) or []
# Contributors
raw_contributors = _get(
detail, "contributors", "contributor", default=[]
) or []
contributors = []
for c in raw_contributors:
contributors.append(
{
"name": _get(c, "credit-name", "value"),
"orcid": _get(c, "contributor-orcid", "path"),
"role": _get(
c, "contributor-attributes", "contributor-role"
),
}
)
return {
"put_code": put_code,
"title": title or "Untitled",
"title": title,
"subtitle": subtitle,
"type": type_,
"journal": journal,
"pub_year": int(year) if year is not None else None,
"pub_month": int(month) if month is not None else None,
"pub_day": int(day) if day is not None else None,
"doi": doi,
"pub_year": pub_year,
"type": work_type,
"hash_fingerprint": fingerprint
"url": url,
"short_description": short_description,
"citation_type": citation_type,
"citation_value": citation_value,
"language_code": language_code,
"country": country,
"external_ids": external_ids_full,
"contributors": contributors,
"hash_fingerprint": None,
}
+43 -19
View File
@@ -1,28 +1,28 @@
import httpx
import os
from typing import Optional
import httpx
TOKEN_URL_SANDBOX = "https://sandbox.orcid.org/oauth/token"
BASE_URL_SANDBOX = "https://pub.sandbox.orcid.org/v3.0"
# Si en algún momento pasas a producción, cambiarías a:
# TOKEN_URL_PROD = "https://orcid.org/oauth/token"
# BASE_URL_PROD = "https://pub.orcid.org/v3.0"
class ORCIDClient:
TOKEN_URL = "https://sandbox.orcid.org/oauth/token"
BASE_URL = "https://pub.sandbox.orcid.org/v3.0"
# TOKEN_URL = "https://orcid.org/oauth/token"
# BASE_URL = "https://pub.orcid.org/v3.0"
def __init__(self):
self.client_id = os.getenv("ORCID_CLIENT_ID")
self.client_secret = os.getenv("ORCID_CLIENT_SECRET")
self._token_cache: Optional[str] = None
self.token_url = TOKEN_URL_SANDBOX
self.base_url = BASE_URL_SANDBOX
# ---------------------------------------------------------
# 1. Obtener token público
# ---------------------------------------------------------
def get_public_token(self) -> str:
"""
Obtiene un token público de ORCID (scope: /read-public).
Se cachea en memoria para evitar pedirlo cada vez.
"""
if self._token_cache:
return self._token_cache
@@ -30,11 +30,11 @@ class ORCIDClient:
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
"scope": "/read-public"
"scope": "/read-public",
}
with httpx.Client(timeout=20.0) as client:
response = client.post(self.TOKEN_URL, data=data)
response = client.post(self.token_url, data=data)
response.raise_for_status()
token = response.json()["access_token"]
self._token_cache = token
@@ -43,29 +43,53 @@ class ORCIDClient:
# ---------------------------------------------------------
# Headers comunes
# ---------------------------------------------------------
def _headers(self):
def _headers(self) -> dict:
token = self.get_public_token()
return {
"Accept": "application/json",
"Authorization": f"Bearer {token}"
"Authorization": f"Bearer {token}",
}
# ---------------------------------------------------------
# 2. Consultar /record
# ---------------------------------------------------------
def fetch_record(self, orcid_id: str) -> dict:
url = f"{self.BASE_URL}/{orcid_id}/record"
url = f"{self.base_url}/{orcid_id}/record"
with httpx.Client(timeout=20.0) as client:
response = client.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
# ---------------------------------------------------------
# 3. Consultar /works
# 3. Consultar /works (summary)
# ---------------------------------------------------------
def fetch_works(self, orcid_id: str) -> dict:
url = f"{self.BASE_URL}/{orcid_id}/works"
url = f"{self.base_url}/{orcid_id}/works"
with httpx.Client(timeout=20.0) as client:
response = client.get(url, headers=self._headers())
response.raise_for_status()
return response.json()
# ---------------------------------------------------------
# 4. Consultar /work/{put_code} (detalle)
# ---------------------------------------------------------
def fetch_work_detail(self, orcid_id: str, put_code: int) -> dict | None:
url = f"{self.base_url}/{orcid_id}/work/{put_code}"
with httpx.Client(timeout=20.0) as client:
response = client.get(url, headers=self._headers())
if response.status_code != 200:
return None
return response.json()
# -------------------------------------------------------------------
# Funciones de módulo usadas en researchers.py
# -------------------------------------------------------------------
def get_works_summary(orcid_id: str) -> dict:
client = ORCIDClient()
return client.fetch_works(orcid_id)
def get_work_detail(orcid_id: str, put_code: int) -> dict | None:
client = ORCIDClient()
return client.fetch_work_detail(orcid_id, put_code)
-155
View File
@@ -1,155 +0,0 @@
from datetime import datetime
from xml.etree.ElementTree import Element, SubElement, tostring
from io import BytesIO
import zipfile
import json
class SWORDExporter:
ATOM_NS = "http://www.w3.org/2005/Atom"
DC_NS = "http://purl.org/dc/elements/1.1/"
# ---------------------------------------------------------
# 1) XML PRINCIPAL (sword.xml)
# ---------------------------------------------------------
@staticmethod
def export_feed_xml(researcher, publications) -> bytes:
feed = Element("feed", xmlns=SWORDExporter.ATOM_NS)
title = SubElement(feed, "title")
title.text = f"Publications for {researcher.orcid_id}"
author = SubElement(feed, "author")
name = SubElement(author, "name")
name.text = researcher.name or "Unknown"
updated = SubElement(feed, "updated")
updated.text = datetime.utcnow().isoformat() + "Z"
feed_id = SubElement(feed, "id")
feed_id.text = f"urn:uuid:{researcher.id}"
for pub in publications:
entry = SubElement(feed, "entry")
entry_id = SubElement(entry, "id")
entry_id.text = f"urn:uuid:{pub.id}"
entry_updated = SubElement(entry, "updated")
entry_updated.text = datetime.utcnow().isoformat() + "Z"
dc_title = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}title")
dc_title.text = pub.title
if pub.doi:
dc_identifier = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}identifier")
dc_identifier.text = f"doi:{pub.doi}"
if pub.pub_year:
dc_date = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}date")
dc_date.text = str(pub.pub_year)
if pub.type:
dc_type = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}type")
dc_type.text = pub.type
if pub.journal:
dc_source = SubElement(entry, f"{{{SWORDExporter.DC_NS}}}source")
dc_source.text = pub.journal
xml_bytes = tostring(feed, encoding="utf-8", xml_declaration=True)
return xml_bytes
# ---------------------------------------------------------
# 2) manifest.txt
# ---------------------------------------------------------
@staticmethod
def generate_manifest(researcher, publications) -> str:
lines = [
"SWORD Deposit Package",
"----------------------",
f"Researcher ORCID: {researcher.orcid_id}",
f"Researcher Name: {researcher.name or 'Unknown'}",
f"Total Publications: {len(publications)}",
f"Generated At: {datetime.utcnow().isoformat()}Z",
"",
"Publications:",
]
for pub in publications:
lines.append(f"- {pub.title} ({pub.pub_year}) DOI={pub.doi}")
return "\n".join(lines)
# ---------------------------------------------------------
# 3) metadata.json
# ---------------------------------------------------------
@staticmethod
def generate_metadata_json(researcher, publications) -> str:
data = {
"researcher": {
"orcid_id": researcher.orcid_id,
"name": researcher.name,
"id": str(researcher.id),
},
"generated_at": datetime.utcnow().isoformat() + "Z",
"publications": [
{
"id": str(pub.id),
"title": pub.title,
"doi": pub.doi,
"year": pub.pub_year,
"type": pub.type,
"journal": pub.journal,
}
for pub in publications
],
}
return json.dumps(data, indent=4)
# ---------------------------------------------------------
# 4) mets.xml (versión simple)
# ---------------------------------------------------------
@staticmethod
def generate_mets_xml(researcher, publications) -> bytes:
mets = Element("mets", xmlns="http://www.loc.gov/METS/")
header = SubElement(mets, "metsHdr")
agent = SubElement(header, "agent", ROLE="CREATOR", TYPE="OTHER")
name = SubElement(agent, "name")
name.text = "ORCID Exporter System"
dmd_sec = SubElement(mets, "dmdSec", ID="dmd1")
md_wrap = SubElement(dmd_sec, "mdWrap", MDTYPE="DC")
xml_data = SubElement(md_wrap, "xmlData")
for pub in publications:
dc_title = SubElement(xml_data, f"{{{SWORDExporter.DC_NS}}}title")
dc_title.text = pub.title
if pub.doi:
dc_id = SubElement(xml_data, f"{{{SWORDExporter.DC_NS}}}identifier")
dc_id.text = f"doi:{pub.doi}"
return tostring(mets, encoding="utf-8", xml_declaration=True)
# ---------------------------------------------------------
# 5) ZIP FINAL
# ---------------------------------------------------------
@staticmethod
def export_zip(researcher, publications) -> bytes:
xml_bytes = SWORDExporter.export_feed_xml(researcher, publications)
manifest = SWORDExporter.generate_manifest(researcher, publications)
metadata_json = SWORDExporter.generate_metadata_json(researcher, publications)
mets_xml = SWORDExporter.generate_mets_xml(researcher, publications)
mem_file = BytesIO()
with zipfile.ZipFile(mem_file, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("sword.xml", xml_bytes)
zf.writestr("manifest.txt", manifest)
zf.writestr("metadata.json", metadata_json)
zf.writestr("mets.xml", mets_xml)
mem_file.seek(0)
return mem_file.read()
+112
View File
@@ -0,0 +1,112 @@
from datetime import datetime
from xml.etree.ElementTree import Element, SubElement, tostring
from app.db.models import Publication, Researcher
ATOM_NS = "http://www.w3.org/2005/Atom"
DC_NS = "http://purl.org/dc/elements/1.1/"
EXTRA_NS = "http://example.org/orcid-extra" # namespace para campos extendidos
class SWORDGenerator:
@staticmethod
def generate_feed_xml(researcher: Researcher, publications: list[Publication]) -> bytes:
feed = Element("feed", {
"xmlns": ATOM_NS,
"xmlns:dc": DC_NS,
"xmlns:extra": EXTRA_NS
})
SubElement(feed, "title").text = f"Publications for {researcher.orcid_id}"
author = SubElement(feed, "author")
SubElement(author, "name").text = researcher.name or "Unknown"
SubElement(feed, "updated").text = datetime.utcnow().isoformat() + "Z"
SubElement(feed, "id").text = f"urn:uuid:{researcher.id}"
for pub in publications:
entry = SubElement(feed, "entry")
SubElement(entry, "id").text = f"urn:uuid:{pub.id}"
SubElement(entry, "updated").text = datetime.utcnow().isoformat() + "Z"
# Title
SubElement(entry, f"{{{DC_NS}}}title").text = pub.title or "Untitled"
# Subtitle
if pub.subtitle:
SubElement(entry, f"{{{EXTRA_NS}}}subtitle").text = pub.subtitle
# DOI
if pub.doi:
SubElement(entry, f"{{{DC_NS}}}identifier").text = f"doi:{pub.doi}"
# Journal
if pub.journal:
SubElement(entry, f"{{{DC_NS}}}source").text = pub.journal
# URL
if pub.url:
SubElement(entry, f"{{{DC_NS}}}relation").text = pub.url
# Short description
if pub.short_description:
SubElement(entry, f"{{{DC_NS}}}description").text = pub.short_description
# Citation
if pub.citation_value:
cit = SubElement(entry, f"{{{EXTRA_NS}}}citation")
SubElement(cit, "type").text = pub.citation_type or "unknown"
SubElement(cit, "value").text = pub.citation_value
# Language
if pub.language_code:
SubElement(entry, f"{{{DC_NS}}}language").text = pub.language_code
# Country
if pub.country:
SubElement(entry, f"{{{EXTRA_NS}}}country").text = pub.country
# External IDs
if pub.external_ids:
ext_ids_el = SubElement(entry, f"{{{EXTRA_NS}}}external_ids")
for ext in pub.external_ids:
ext_el = SubElement(ext_ids_el, "external_id")
for k, v in ext.items():
if isinstance(v, dict) and "value" in v:
SubElement(ext_el, k).text = v["value"]
else:
SubElement(ext_el, k).text = str(v)
# Contributors
if pub.contributors:
contribs_el = SubElement(entry, f"{{{EXTRA_NS}}}contributors")
for c in pub.contributors:
c_el = SubElement(contribs_el, "contributor")
SubElement(c_el, "name").text = c.get("name")
SubElement(c_el, "orcid").text = c.get("orcid")
SubElement(c_el, "role").text = c.get("role")
# Date
if pub.pub_year:
date_str = str(pub.pub_year)
if pub.pub_month:
date_str += f"-{pub.pub_month:02d}"
if pub.pub_day:
date_str += f"-{pub.pub_day:02d}"
SubElement(entry, f"{{{DC_NS}}}date").text = date_str
# Type
if pub.type:
SubElement(entry, f"{{{DC_NS}}}type").text = pub.type
# Status (new / updated / unchanged)
if hasattr(pub, "status") and pub.status:
SubElement(entry, f"{{{EXTRA_NS}}}status").text = pub.status
# Last modified
if pub.last_modified:
SubElement(entry, f"{{{EXTRA_NS}}}last_modified").text = pub.last_modified.isoformat()
return tostring(feed, encoding="utf-8", xml_declaration=True)
+57 -17
View File
@@ -1,10 +1,12 @@
from sqlalchemy.orm import Session
import httpx
from app.services.orcid_client import ORCIDClient
from app.services.normalizer import PublicationNormalizer
from app.repositories.researcher_repository import ResearcherRepository
from app.repositories.publication_repository import PublicationRepository
from app.repositories.syncjob_repository import SyncJobRepository
import httpx
from app.db.repositories.researcher_repository import ResearcherRepository
from app.db.repositories.publication_repository import PublicationRepository
from app.db.repositories.syncjob_repository import SyncJobRepository
class SyncService:
@@ -16,8 +18,6 @@ class SyncService:
"""
Sincroniza las publicaciones de un investigador con manejo robusto de errores.
"""
# 1. Obtener o crear investigador
try:
researcher = ResearcherRepository.get_by_orcid(db, orcid_id)
@@ -35,14 +35,23 @@ class SyncService:
if e.response.status_code == 404:
return {
"status": "error",
"message": f"El ORCID {orcid_id} no existe en Sandbox."
"code": 404,
"message": f"El ORCID {orcid_id} no existe en ORCID."
}
return {"status": "error", "message": str(e)}
return {
"status": "error",
"code": e.response.status_code,
"message": f"Error al consultar ORCID: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"code": 500,
"message": f"Error interno durante la sincronización: {str(e)}"
}
# 2. Crear SyncJob
job = SyncJobRepository.start_job(db, researcher.id)
# 3. Obtener works
try:
works_raw = self.orcid_client.fetch_works(orcid_id)
except httpx.HTTPStatusError as e:
@@ -56,19 +65,27 @@ class SyncService:
"updated_records": 0,
"total": 0
}
return {"status": "error", "message": str(e)}
return {
"status": "error",
"code": e.response.status_code,
"message": f"Error al obtener works de ORCID: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"code": 500,
"message": f"Error interno al obtener works: {str(e)}"
}
groups = works_raw.get("group", [])
new_records = 0
updated_records = 0
# 4. Procesar works
for group in groups:
summary = group["work-summary"][0]
normalized = PublicationNormalizer.normalize_work(summary)
# 🔥 AHORA SE DETECTAN DUPLICADOS POR put_code
existing = PublicationRepository.get_by_put_code(
db, researcher.id, normalized["put_code"]
)
@@ -80,17 +97,40 @@ class SyncService:
PublicationRepository.create(db, researcher.id, normalized)
new_records += 1
# 5. Finalizar SyncJob
SyncJobRepository.finish_job(db, job, new_records, updated_records)
# 6. Actualizar last_sync_at
ResearcherRepository.update_last_sync(db, researcher)
return {
"status": "ok",
"message": "Sincronización completada correctamente.",
"researcher": researcher.orcid_id,
"researcher_id": researcher.id,
"new_records": new_records,
"updated_records": updated_records,
"total": new_records + updated_records
}
def sync_and_get_full(self, db: Session, orcid_id: str):
"""
Sincroniza (si es necesario) y devuelve investigador + publicaciones.
Pensado para el buscador: una sola petición.
"""
sync_result = self.sync_researcher(db, orcid_id)
if sync_result.get("status") == "error":
return sync_result
researcher = ResearcherRepository.get_by_orcid(db, orcid_id)
if not researcher:
return {
"status": "error",
"code": 500,
"message": "Error interno: investigador no encontrado tras sincronización."
}
publications = PublicationRepository.list_by_researcher(db, researcher.id)
return {
"status": "ok",
"researcher": researcher,
"publications": publications
}
+165
View File
@@ -0,0 +1,165 @@
import io
import zipfile
import json
from datetime import datetime
from xml.etree.ElementTree import Element, SubElement, tostring
from app.db.models import Publication, Researcher
from app.services.sword_generator import SWORDGenerator
class ZIPGenerator:
# ---------------------------------------------------------
# MANIFEST.TXT — más completo
# ---------------------------------------------------------
@staticmethod
def generate_manifest(researcher, publications):
lines = [
"SWORD Deposit Package",
"----------------------",
f"Researcher ORCID: {researcher.orcid_id}",
f"Researcher Name: {researcher.name}",
f"Researcher UUID: {researcher.id}",
f"Total Publications: {len(publications)}",
f"Generated At: {datetime.utcnow().isoformat()}Z",
"",
"Publications:",
]
for pub in publications:
year = pub.pub_year or "Unknown"
lines.append(
f"- {pub.title} ({year}) | DOI={pub.doi} | TYPE={pub.type}"
)
return "\n".join(lines)
# ---------------------------------------------------------
# METADATA.JSON — ahora con TODOS los campos
# ---------------------------------------------------------
@staticmethod
def generate_metadata_json(researcher, publications):
data = {
"researcher": {
"orcid_id": researcher.orcid_id,
"name": researcher.name,
"id": str(researcher.id),
"last_sync_at": researcher.last_sync_at.isoformat() if researcher.last_sync_at else None,
},
"generated_at": datetime.utcnow().isoformat() + "Z",
"publications": [],
}
for pub in publications:
data["publications"].append({
"id": str(pub.id),
"put_code": pub.put_code,
"title": pub.title,
"subtitle": pub.subtitle,
"doi": pub.doi,
"journal": pub.journal,
"type": pub.type,
"url": pub.url,
"short_description": pub.short_description,
"citation_type": pub.citation_type,
"citation_value": pub.citation_value,
"language_code": pub.language_code,
"country": pub.country,
"pub_year": pub.pub_year,
"pub_month": pub.pub_month,
"pub_day": pub.pub_day,
"external_ids": pub.external_ids,
"contributors": pub.contributors,
"hash_fingerprint": pub.hash_fingerprint,
"last_modified": pub.last_modified.isoformat() if pub.last_modified else None,
"status": getattr(pub, "status", None),
})
return json.dumps(data, indent=4)
# ---------------------------------------------------------
# METS.XML — ampliado con más metadatos
# ---------------------------------------------------------
@staticmethod
def generate_mets_xml(researcher, publications):
mets = Element("mets", xmlns="http://www.loc.gov/METS/")
header = SubElement(mets, "metsHdr")
agent = SubElement(header, "agent", ROLE="CREATOR", TYPE="OTHER")
SubElement(agent, "name").text = "ORCID Exporter System"
dmd_sec = SubElement(mets, "dmdSec", ID="dmd1")
md_wrap = SubElement(dmd_sec, "mdWrap", MDTYPE="DC")
xml_data = SubElement(md_wrap, "xmlData")
for pub in publications:
# Title
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}title").text = pub.title
# Subtitle
if pub.subtitle:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}description").text = pub.subtitle
# DOI
if pub.doi:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}identifier").text = f"doi:{pub.doi}"
# Journal
if pub.journal:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}source").text = pub.journal
# URL
if pub.url:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}relation").text = pub.url
# Description
if pub.short_description:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}description").text = pub.short_description
# Citation
if pub.citation_value:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}bibliographicCitation").text = pub.citation_value
# Language
if pub.language_code:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}language").text = pub.language_code
# Country
if pub.country:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}coverage").text = pub.country
# Date
if pub.pub_year:
date_str = str(pub.pub_year)
if pub.pub_month:
date_str += f"-{pub.pub_month:02d}"
if pub.pub_day:
date_str += f"-{pub.pub_day:02d}"
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}date").text = date_str
# Type
if pub.type:
SubElement(xml_data, "{http://purl.org/dc/elements/1.1/}type").text = pub.type
return tostring(mets, encoding="utf-8", xml_declaration=True)
# ---------------------------------------------------------
# ZIP FINAL
# ---------------------------------------------------------
@staticmethod
def generate_zip(researcher, publications):
xml_bytes = SWORDGenerator.generate_feed_xml(researcher, publications)
manifest = ZIPGenerator.generate_manifest(researcher, publications)
metadata_json = ZIPGenerator.generate_metadata_json(researcher, publications)
mets_xml = ZIPGenerator.generate_mets_xml(researcher, publications)
mem_file = io.BytesIO()
with zipfile.ZipFile(mem_file, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("sword.xml", xml_bytes)
zf.writestr("manifest.txt", manifest)
zf.writestr("metadata.json", metadata_json)
zf.writestr("mets.xml", mets_xml)
mem_file.seek(0)
return mem_file.read()