Add materials, exam images, storage quota, and API guide

Upload documents for AI context, exam images for Moodle questions, per-template storage limits, embedded images in XML export, and GUIA_API_Y_FLUJO.md with full endpoint documentation.
This commit is contained in:
Mireya Cueto Garrido
2026-06-01 10:30:40 +02:00
parent ba2507918b
commit 7bc27da33a
29 changed files with 1892 additions and 59 deletions
@@ -0,0 +1,74 @@
from pathlib import Path
from app.core.errors import AppError
SUPPORTED_EXTENSIONS = {
".pdf": "application/pdf",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".txt": "text/plain",
".md": "text/markdown",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
}
class DocumentExtractor:
def extract(self, file_path: Path, mime_type: str) -> str:
suffix = file_path.suffix.lower()
if mime_type == "application/pdf" or suffix == ".pdf":
return self._extract_pdf(file_path)
if (
mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
or suffix == ".docx"
):
return self._extract_docx(file_path)
if mime_type.startswith("text/") or suffix in {".txt", ".md"}:
return self._extract_text(file_path)
if mime_type.startswith("image/") or suffix in {".png", ".jpg", ".jpeg", ".webp"}:
return self._extract_image(file_path)
raise AppError(f"Unsupported file type: {mime_type}", status_code=415, code="unsupported_media")
def _extract_pdf(self, file_path: Path) -> str:
from pypdf import PdfReader
reader = PdfReader(str(file_path))
parts = [page.extract_text() or "" for page in reader.pages]
text = "\n".join(parts).strip()
if not text:
raise AppError("PDF does not contain extractable text", status_code=422, code="empty_extraction")
return text
def _extract_docx(self, file_path: Path) -> str:
from docx import Document
document = Document(str(file_path))
parts = [paragraph.text.strip() for paragraph in document.paragraphs if paragraph.text.strip()]
text = "\n".join(parts).strip()
if not text:
raise AppError("DOCX does not contain extractable text", status_code=422, code="empty_extraction")
return text
def _extract_text(self, file_path: Path) -> str:
text = file_path.read_text(encoding="utf-8", errors="ignore").strip()
if not text:
raise AppError("Text file is empty", status_code=422, code="empty_extraction")
return text
def _extract_image(self, file_path: Path) -> str:
try:
import pytesseract
from PIL import Image
except ImportError as exc:
raise AppError(
"Image OCR is not available on this server",
status_code=503,
code="ocr_unavailable",
) from exc
image = Image.open(file_path)
text = pytesseract.image_to_string(image, lang="spa+eng").strip()
if not text:
raise AppError("Image does not contain recognizable text", status_code=422, code="empty_extraction")
return text
+77 -5
View File
@@ -17,7 +17,9 @@ from app.schemas.exam import (
QuestionCreate,
QuestionRead,
)
from app.services.image_service import ImageService
from app.services.llm import LLMClient
from app.services.material_service import MaterialService
from app.services.moodle_exporter import MoodleXMLExporter
from app.services.parser import AIQuestionParser
from app.services.prompt_builder import PromptBuilder
@@ -30,11 +32,15 @@ class ExamService:
prompt_builder: PromptBuilder | None = None,
parser: AIQuestionParser | None = None,
exporter: MoodleXMLExporter | None = None,
material_service: MaterialService | None = None,
image_service: ImageService | None = None,
) -> None:
self.db = db
self.prompt_builder = prompt_builder or PromptBuilder()
self.parser = parser or AIQuestionParser()
self.exporter = exporter or MoodleXMLExporter()
self.material_service = material_service
self.image_service = image_service
def create_template(self, user_id: uuid.UUID, payload: ExamTemplateCreate) -> ExamTemplateRead:
template = ExamTemplate(
@@ -87,9 +93,25 @@ class ExamService:
def get_template(self, user_id: uuid.UUID, template_id: uuid.UUID) -> ExamTemplateRead:
return self._template_read(self._get_user_template_or_404(user_id, template_id))
def build_prompt(self, user_id: uuid.UUID, template_id: uuid.UUID, topic_prompt: str) -> PromptResponse:
def get_owned_template(self, user_id: uuid.UUID, template_id: uuid.UUID) -> ExamTemplate:
return self._get_user_template_or_404(user_id, template_id)
def build_prompt(
self,
user_id: uuid.UUID,
template_id: uuid.UUID,
topic_prompt: str,
material_ids: list[uuid.UUID] | None = None,
) -> PromptResponse:
template = self._get_user_template_or_404(user_id, template_id)
prompt = self.prompt_builder.build_prompt(template, topic_prompt)
reference_context = self._reference_context(template_id, material_ids)
images_catalog = self._images_catalog(template_id)
prompt = self.prompt_builder.build_prompt(
template,
topic_prompt,
reference_context,
images_catalog,
)
return PromptResponse(template_id=template.id, prompt=prompt)
async def generate_with_llm(
@@ -98,9 +120,17 @@ class ExamService:
template_id: uuid.UUID,
topic_prompt: str,
llm_client: LLMClient,
material_ids: list[uuid.UUID] | None = None,
) -> ParsedQuestionsResponse:
template = self._get_user_template_or_404(user_id, template_id)
prompt = self.prompt_builder.build_prompt(template, topic_prompt)
reference_context = self._reference_context(template_id, material_ids)
images_catalog = self._images_catalog(template_id)
prompt = self.prompt_builder.build_prompt(
template,
topic_prompt,
reference_context,
images_catalog,
)
raw_output = await llm_client.generate(prompt)
questions = self.parser.parse_json(raw_output)
return self._persist_questions(template.id, questions)
@@ -116,8 +146,9 @@ class ExamService:
if not questions:
raise NotFoundError("Template does not contain questions to export")
image_map = self._image_map(template.id)
if export_format == ExportFormat.XML:
content = self.exporter.export_xml(questions)
content = self.exporter.export_xml(questions, image_map)
elif export_format == ExportFormat.TXT:
content = self.exporter.export_txt(questions)
else:
@@ -134,9 +165,30 @@ class ExamService:
self.db.commit()
return ExportResponse(template_id=template.id, format=export_format, content=content)
def get_owned_question(self, user_id: uuid.UUID, question_id: uuid.UUID) -> tuple[Question, ExamTemplate]:
question = self.db.get(Question, question_id)
if question is None:
raise NotFoundError("Question not found")
template = self._get_user_template_or_404(user_id, question.template_id)
if question.template_id != template.id:
raise NotFoundError("Question not found")
return question, template
def to_question_read(self, question: Question) -> QuestionRead:
read = QuestionRead.model_validate(question)
if question.image_id:
return read.model_copy(update={"image_url": f"/exam/images/{question.image_id}/content"})
return read
def _persist_questions(self, template_id: uuid.UUID, questions: list[QuestionCreate]) -> ParsedQuestionsResponse:
persisted: list[Question] = []
for payload in questions:
image_id = payload.image_id
if image_id is not None:
if self.image_service is None:
raise NotFoundError("Image service is not available")
self.image_service.get_image_for_template(template_id, image_id)
question = Question(
template_id=template_id,
question_type=payload.question_type,
@@ -144,6 +196,7 @@ class ExamService:
correct_answers=[clean_text(answer, max_length=1_000) for answer in payload.correct_answers],
wrong_answers=[clean_text(answer, max_length=1_000) for answer in payload.wrong_answers],
matching_pairs=[pair.model_dump() for pair in payload.matching_pairs],
image_id=image_id,
difficulty=payload.difficulty,
score=payload.score,
penalty=payload.penalty,
@@ -156,7 +209,26 @@ class ExamService:
for question in persisted:
self.db.refresh(question)
return ParsedQuestionsResponse(questions=[QuestionRead.model_validate(question) for question in persisted])
return ParsedQuestionsResponse(questions=[self.to_question_read(question) for question in persisted])
def _reference_context(
self,
template_id: uuid.UUID,
material_ids: list[uuid.UUID] | None,
) -> str:
if self.material_service is None:
return ""
return self.material_service.build_reference_context(template_id, material_ids)
def _images_catalog(self, template_id: uuid.UUID) -> str:
if self.image_service is None:
return ""
return self.image_service.images_catalog(template_id)
def _image_map(self, template_id: uuid.UUID) -> dict[uuid.UUID, object]:
if self.image_service is None:
return {}
return self.image_service.build_image_map(template_id)
def _get_user_template_or_404(self, user_id: uuid.UUID, template_id: uuid.UUID) -> ExamTemplate:
template = self.db.get(ExamTemplate, template_id)
+206
View File
@@ -0,0 +1,206 @@
import uuid
from pathlib import Path
from fastapi import UploadFile
from PIL import Image, UnidentifiedImageError
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.core.config import Settings
from app.core.errors import AppError, NotFoundError
from app.core.security import clean_text
from app.models.exam import ExamImage, ExamTemplate, Question
from app.services.storage_quota import StorageQuotaService
ALLOWED_IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".gif"}
ALLOWED_IMAGE_MIMES = {
"image/png",
"image/jpeg",
"image/webp",
"image/gif",
}
class ImageService:
def __init__(
self,
db: Session,
settings: Settings,
storage_quota: StorageQuotaService | None = None,
) -> None:
self.db = db
self.settings = settings
self.storage_quota = storage_quota or StorageQuotaService(db, settings)
self.image_root = Path(settings.upload_dir) / "exam_images"
self.image_root.mkdir(parents=True, exist_ok=True)
def upload(
self,
template: ExamTemplate,
upload_file: UploadFile,
caption: str | None = None,
) -> ExamImage:
self._validate_upload_count(template.id)
suffix, mime_type = self._validate_image_file(upload_file)
content = upload_file.file.read()
if len(content) > self.settings.max_image_bytes:
raise AppError(
f"Image exceeds maximum size of {self.settings.max_image_bytes} bytes",
status_code=413,
code="file_too_large",
)
self.storage_quota.ensure_template_has_space(template.id, len(content))
image_id = uuid.uuid4()
stored_filename = f"{image_id}{suffix}"
target_dir = self.image_root / str(template.user_id) / str(template.id)
target_dir.mkdir(parents=True, exist_ok=True)
storage_path = target_dir / stored_filename
storage_path.write_bytes(content)
self._verify_image_integrity(storage_path)
image = ExamImage(
id=image_id,
template_id=template.id,
original_filename=clean_text(upload_file.filename or stored_filename, max_length=255),
stored_filename=stored_filename,
mime_type=mime_type,
size_bytes=len(content),
storage_path=str(storage_path),
caption=clean_text(caption, max_length=500) if caption else None,
)
self.db.add(image)
self.db.commit()
self.db.refresh(image)
return image
def list_images(self, template_id: uuid.UUID) -> list[ExamImage]:
return list(
self.db.scalars(
select(ExamImage)
.where(ExamImage.template_id == template_id)
.order_by(ExamImage.created_at.desc())
).all()
)
def get_image_for_template(self, template_id: uuid.UUID, image_id: uuid.UUID) -> ExamImage:
image = self.db.get(ExamImage, image_id)
if image is None or image.template_id != template_id:
raise NotFoundError("Image not found for this template")
return image
def get_image_for_user(self, user_id: uuid.UUID, image_id: uuid.UUID) -> ExamImage:
image = self.db.get(ExamImage, image_id)
if image is None:
raise NotFoundError("Image not found")
template = image.template
if template.user_id != user_id:
raise NotFoundError("Image not found")
return image
def delete_image(self, template: ExamTemplate, image_id: uuid.UUID) -> None:
image = self.get_image_for_template(template.id, image_id)
for question in list(image.questions):
question.image_id = None
path = Path(image.storage_path)
if path.exists():
path.unlink()
self.db.delete(image)
self.db.commit()
def attach_image_to_question(
self,
template: ExamTemplate,
question: Question,
image_id: uuid.UUID | None,
) -> Question:
if question.template_id != template.id:
raise NotFoundError("Question not found for this template")
if image_id is not None:
self.get_image_for_template(template.id, image_id)
question.image_id = image_id
self.db.commit()
self.db.refresh(question)
return question
def images_catalog(self, template_id: uuid.UUID) -> str:
images = self.list_images(template_id)
if not images:
return ""
lines = [
"Imágenes disponibles para preguntas visuales (el enunciado debe referirse a la imagen; "
"asigna el campo image_id en cada pregunta que deba mostrarla):"
]
for image in images:
caption = image.caption or "sin descripción"
lines.append(
f"- image_id: {image.id} | archivo: {image.original_filename} | descripción: {caption}"
)
return "\n".join(lines)
def build_image_map(self, template_id: uuid.UUID) -> dict[uuid.UUID, ExamImage]:
images = self.list_images(template_id)
return {image.id: image for image in images}
def to_read(self, image: ExamImage) -> dict[str, object]:
return {
"id": image.id,
"template_id": image.template_id,
"original_filename": image.original_filename,
"stored_filename": image.stored_filename,
"mime_type": image.mime_type,
"size_bytes": image.size_bytes,
"caption": image.caption,
"content_url": f"/exam/images/{image.id}/content",
"created_at": image.created_at,
}
def _validate_upload_count(self, template_id: uuid.UUID) -> None:
count = self.db.scalar(
select(func.count()).select_from(ExamImage).where(ExamImage.template_id == template_id)
)
if count is not None and count >= self.settings.max_images_per_template:
raise AppError(
f"Maximum of {self.settings.max_images_per_template} images per template reached",
status_code=409,
code="too_many_images",
)
def _validate_image_file(self, upload_file: UploadFile) -> tuple[str, str]:
if not upload_file.filename:
raise AppError("Filename is required", status_code=400, code="invalid_file")
suffix = Path(upload_file.filename).suffix.lower()
if suffix not in ALLOWED_IMAGE_EXTENSIONS:
raise AppError(
f"Unsupported image type. Allowed: {', '.join(sorted(ALLOWED_IMAGE_EXTENSIONS))}",
status_code=415,
code="unsupported_media",
)
mime_type = upload_file.content_type or ""
if mime_type and mime_type not in ALLOWED_IMAGE_MIMES:
raise AppError("Unsupported image MIME type", status_code=415, code="unsupported_media")
mime_by_suffix = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
".gif": "image/gif",
}
resolved_mime = mime_type if mime_type in ALLOWED_IMAGE_MIMES else mime_by_suffix[suffix]
return suffix, resolved_mime
def _verify_image_integrity(self, storage_path: Path) -> None:
try:
with Image.open(storage_path) as img:
img.verify()
except (UnidentifiedImageError, OSError) as exc:
storage_path.unlink(missing_ok=True)
raise AppError("Invalid or corrupted image file", status_code=422, code="invalid_image") from exc
+188
View File
@@ -0,0 +1,188 @@
import uuid
from pathlib import Path
from fastapi import UploadFile
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.core.config import Settings
from app.core.errors import AppError, NotFoundError
from app.core.security import clean_text
from app.models.exam import ExamMaterial, ExamTemplate, MaterialStatus
from app.schemas.material import ExamMaterialRead
from app.services.document_extractor import SUPPORTED_EXTENSIONS, DocumentExtractor
from app.services.storage_quota import StorageQuotaService
class MaterialService:
def __init__(
self,
db: Session,
settings: Settings,
storage_quota: StorageQuotaService | None = None,
) -> None:
self.db = db
self.settings = settings
self.storage_quota = storage_quota or StorageQuotaService(db, settings)
self.extractor = DocumentExtractor()
self.upload_root = Path(settings.upload_dir)
self.upload_root.mkdir(parents=True, exist_ok=True)
def upload(
self,
template: ExamTemplate,
upload_file: UploadFile,
) -> ExamMaterialRead:
self._validate_upload(template.id, upload_file)
suffix = Path(upload_file.filename or "file").suffix.lower()
if suffix not in SUPPORTED_EXTENSIONS:
raise AppError(
f"Unsupported extension. Allowed: {', '.join(sorted(SUPPORTED_EXTENSIONS))}",
status_code=415,
code="unsupported_media",
)
content = upload_file.file.read()
if len(content) > self.settings.max_upload_bytes:
raise AppError(
f"File exceeds maximum size of {self.settings.max_upload_bytes} bytes",
status_code=413,
code="file_too_large",
)
if not content:
raise AppError("Uploaded file is empty", status_code=400, code="empty_file")
self.storage_quota.ensure_template_has_space(template.id, len(content))
material_id = uuid.uuid4()
safe_name = f"{material_id}{suffix}"
target_dir = self.upload_root / str(template.user_id) / str(template.id)
target_dir.mkdir(parents=True, exist_ok=True)
storage_path = target_dir / safe_name
storage_path.write_bytes(content)
mime_type = upload_file.content_type or SUPPORTED_EXTENSIONS[suffix]
material = ExamMaterial(
id=material_id,
template_id=template.id,
original_filename=clean_text(upload_file.filename or safe_name, max_length=255),
mime_type=mime_type,
size_bytes=len(content),
storage_path=str(storage_path),
status=MaterialStatus.PROCESSED,
)
try:
material.extracted_text = clean_text(
self.extractor.extract(storage_path, mime_type),
max_length=500_000,
)
except AppError as exc:
material.status = MaterialStatus.FAILED
material.error_message = clean_text(exc.message, max_length=500)
except Exception as exc:
material.status = MaterialStatus.FAILED
material.error_message = clean_text(str(exc), max_length=500)
self.db.add(material)
self.db.commit()
self.db.refresh(material)
return self._to_read(material)
def list_materials(self, template_id: uuid.UUID) -> list[ExamMaterialRead]:
materials = self.db.scalars(
select(ExamMaterial)
.where(ExamMaterial.template_id == template_id)
.order_by(ExamMaterial.created_at.desc())
).all()
return [self._to_read(material) for material in materials]
def delete_material(self, template: ExamTemplate, material_id: uuid.UUID) -> None:
material = self.db.get(ExamMaterial, material_id)
if material is None or material.template_id != template.id:
raise NotFoundError("Material not found")
path = Path(material.storage_path)
if path.exists():
path.unlink()
self.db.delete(material)
self.db.commit()
def build_reference_context(
self,
template_id: uuid.UUID,
material_ids: list[uuid.UUID] | None = None,
) -> str:
query = select(ExamMaterial).where(
ExamMaterial.template_id == template_id,
ExamMaterial.status == MaterialStatus.PROCESSED,
ExamMaterial.extracted_text.isnot(None),
)
if material_ids:
query = query.where(ExamMaterial.id.in_(material_ids))
materials = self.db.scalars(query.order_by(ExamMaterial.created_at.asc())).all()
if material_ids:
found_ids = {material.id for material in materials}
missing = [material_id for material_id in material_ids if material_id not in found_ids]
if missing:
raise NotFoundError("One or more material IDs were not found or are not processed")
if not materials:
return ""
sections: list[str] = []
for material in materials:
text = material.extracted_text or ""
if not text.strip():
continue
sections.append(
f"--- Archivo: {material.original_filename} ---\n{text.strip()}"
)
if not sections:
return ""
combined = "\n\n".join(sections)
max_chars = self.settings.max_reference_chars
if len(combined) <= max_chars:
return combined
truncated = combined[:max_chars].rsplit("\n", 1)[0]
return f"{truncated}\n\n[Material truncado por límite de contexto]"
def _validate_upload(self, template_id: uuid.UUID, upload_file: UploadFile) -> None:
if not upload_file.filename:
raise AppError("Filename is required", status_code=400, code="invalid_file")
count = self.db.scalar(
select(func.count())
.select_from(ExamMaterial)
.where(ExamMaterial.template_id == template_id)
)
if count is not None and count >= self.settings.max_materials_per_template:
raise AppError(
f"Maximum of {self.settings.max_materials_per_template} files per template reached",
status_code=409,
code="too_many_files",
)
def _to_read(self, material: ExamMaterial) -> ExamMaterialRead:
preview = None
if material.extracted_text:
preview = material.extracted_text[:300]
if len(material.extracted_text) > 300:
preview += "..."
return ExamMaterialRead(
id=material.id,
template_id=material.template_id,
original_filename=material.original_filename,
mime_type=material.mime_type,
size_bytes=material.size_bytes,
status=material.status,
error_message=material.error_message,
text_preview=preview,
created_at=material.created_at,
)
+74 -30
View File
@@ -1,15 +1,20 @@
import base64
import json
from html import escape as html_escape
from pathlib import Path
from typing import Any
from uuid import UUID
from xml.sax.saxutils import escape as xml_escape
from app.core.security import clean_text
class MoodleXMLExporter:
def export_xml(self, questions: list[Any]) -> str:
def export_xml(self, questions: list[Any], image_map: dict[UUID, Any] | None = None) -> str:
images = image_map or {}
parts = ['<?xml version="1.0" encoding="UTF-8"?>', "<quiz>"]
for index, question in enumerate(questions, start=1):
parts.append(self._export_question(question, index))
parts.append(self._export_question(question, index, images))
parts.append("</quiz>")
return "\n".join(parts)
@@ -17,6 +22,8 @@ class MoodleXMLExporter:
blocks: list[str] = []
for question in questions:
lines = [self._attr(question, "statement")]
if self._attr(question, "image_id"):
lines.append(f"[Imagen adjunta: {self._attr(question, 'image_id')}]")
lines.extend(self._attr(question, "correct_answers") or [])
lines.extend(self._attr(question, "wrong_answers") or [])
blocks.append("\n".join(clean_text(str(line)) for line in lines))
@@ -26,19 +33,19 @@ class MoodleXMLExporter:
payload = {"questions": [self._question_dict(question) for question in questions]}
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
def _export_question(self, question: Any, index: int) -> str:
def _export_question(self, question: Any, index: int, image_map: dict[UUID, Any]) -> str:
question_type = self._enum_value(self._attr(question, "question_type"))
if question_type == "multichoice":
return self._multichoice(question, index)
return self._multichoice(question, index, image_map)
if question_type == "truefalse":
return self._truefalse(question, index)
return self._truefalse(question, index, image_map)
if question_type == "shortanswer":
return self._shortanswer(question, index)
return self._shortanswer(question, index, image_map)
if question_type == "matching":
return self._matching(question, index)
return self._matching(question, index, image_map)
raise ValueError(f"Unsupported Moodle question type: {question_type}")
def _multichoice(self, question: Any, index: int) -> str:
def _multichoice(self, question: Any, index: int, image_map: dict[UUID, Any]) -> str:
correct_answers = self._attr(question, "correct_answers") or []
wrong_answers = self._attr(question, "wrong_answers") or []
options = self._attr(question, "options") or {}
@@ -53,7 +60,7 @@ class MoodleXMLExporter:
return "\n".join(
[
' <question type="multichoice">',
self._common_header(question, index),
*self._common_header(question, index, image_map),
f" <single>{str(not multiple_correct).lower()}</single>",
" <shuffleanswers>1</shuffleanswers>",
*answers,
@@ -61,32 +68,32 @@ class MoodleXMLExporter:
]
)
def _truefalse(self, question: Any, index: int) -> str:
def _truefalse(self, question: Any, index: int, image_map: dict[UUID, Any]) -> str:
correct = (self._attr(question, "correct_answers") or ["true"])[0].lower()
is_true = correct in {"true", "verdadero"}
return "\n".join(
[
' <question type="truefalse">',
self._common_header(question, index),
*self._common_header(question, index, image_map),
self._answer_xml("true", 100 if is_true else 0),
self._answer_xml("false", 0 if is_true else 100),
" </question>",
]
)
def _shortanswer(self, question: Any, index: int) -> str:
def _shortanswer(self, question: Any, index: int, image_map: dict[UUID, Any]) -> str:
answers = [self._answer_xml(answer, 100) for answer in self._attr(question, "correct_answers")]
return "\n".join(
[
' <question type="shortanswer">',
self._common_header(question, index),
*self._common_header(question, index, image_map),
" <usecase>0</usecase>",
*answers,
" </question>",
]
)
def _matching(self, question: Any, index: int) -> str:
def _matching(self, question: Any, index: int, image_map: dict[UUID, Any]) -> str:
subquestions = []
for pair in self._attr(question, "matching_pairs") or []:
prompt = pair.get("prompt") if isinstance(pair, dict) else pair.prompt
@@ -106,27 +113,63 @@ class MoodleXMLExporter:
return "\n".join(
[
' <question type="matching">',
self._common_header(question, index),
*self._common_header(question, index, image_map),
*subquestions,
" </question>",
]
)
def _common_header(self, question: Any, index: int) -> str:
def _common_header(self, question: Any, index: int, image_map: dict[UUID, Any]) -> list[str]:
statement = self._attr(question, "statement")
name = clean_text(statement, max_length=80) or f"Pregunta {index}"
return "\n".join(
[
" <name>",
f" <text>{self._xml(name)}</text>",
" </name>",
' <questiontext format="html">',
f" <text>{self._cdata(statement)}</text>",
" </questiontext>",
f" <defaultgrade>{float(self._attr(question, 'score') or 1.0):.2f}</defaultgrade>",
" <generalfeedback format=\"html\"><text></text></generalfeedback>",
]
)
return [
" <name>",
f" <text>{self._xml(name)}</text>",
" </name>",
' <questiontext format="html">',
f" <text>{self._question_html(question, image_map)}</text>",
" </questiontext>",
*self._embedded_files(question, image_map),
f" <defaultgrade>{float(self._attr(question, 'score') or 1.0):.2f}</defaultgrade>",
' <generalfeedback format="html"><text></text></generalfeedback>',
]
def _question_html(self, question: Any, image_map: dict[UUID, Any]) -> str:
statement = html_escape(clean_text(str(self._attr(question, "statement"))))
html_parts = [f"<p>{statement}</p>"]
image = self._resolve_image(question, image_map)
if image is not None:
alt = html_escape(clean_text(image.caption or image.original_filename, max_length=200))
html_parts.append(
f'<p><img src="@@PLUGINFILE@@/{image.stored_filename}" alt="{alt}" /></p>'
)
return self._cdata("".join(html_parts))
def _embedded_files(self, question: Any, image_map: dict[UUID, Any]) -> list[str]:
image = self._resolve_image(question, image_map)
if image is None:
return []
path = Path(image.storage_path)
if not path.exists():
return []
encoded = base64.b64encode(path.read_bytes()).decode("ascii")
return [
f' <file name="{self._xml(image.stored_filename)}" path="/" encoding="base64">',
encoded,
" </file>",
]
def _resolve_image(self, question: Any, image_map: dict[UUID, Any]) -> Any | None:
image_id = self._attr(question, "image_id")
if image_id is None:
return None
if hasattr(question, "image") and question.image is not None:
return question.image
return image_map.get(image_id)
def _answer_xml(self, text: str, fraction: float) -> str:
fraction_text = f"{fraction:.6g}"
@@ -134,7 +177,7 @@ class MoodleXMLExporter:
[
f' <answer fraction="{fraction_text}" format="html">',
f" <text>{self._xml(text)}</text>",
" <feedback format=\"html\"><text></text></feedback>",
' <feedback format="html"><text></text></feedback>',
" </answer>",
]
)
@@ -144,6 +187,7 @@ class MoodleXMLExporter:
"id": str(self._attr(question, "id")) if self._attr(question, "id") else None,
"question_type": self._enum_value(self._attr(question, "question_type")),
"statement": self._attr(question, "statement"),
"image_id": str(self._attr(question, "image_id")) if self._attr(question, "image_id") else None,
"correct_answers": self._attr(question, "correct_answers") or [],
"wrong_answers": self._attr(question, "wrong_answers") or [],
"matching_pairs": self._attr(question, "matching_pairs") or [],
@@ -162,5 +206,5 @@ class MoodleXMLExporter:
return xml_escape(clean_text(str(value)), {'"': "&quot;", "'": "&apos;"})
def _cdata(self, value: Any) -> str:
text = clean_text(str(value)).replace("]]>", "]]]]><![CDATA[>")
text = str(value).replace("]]>", "]]]]><![CDATA[>")
return f"<![CDATA[{text}]]>"
+2
View File
@@ -72,12 +72,14 @@ class AIQuestionParser:
if isinstance(wrong, str):
wrong = [wrong]
image_id = item.get("image_id")
return {
"question_type": question_type,
"statement": item.get("statement", item.get("question", item.get("prompt", ""))),
"correct_answers": correct,
"wrong_answers": wrong,
"matching_pairs": item.get("matching_pairs", []),
"image_id": image_id,
"difficulty": item.get("difficulty", Difficulty.MEDIUM.value),
"score": item.get("score", 1.0),
"penalty": item.get("penalty", 0.0),
+24 -1
View File
@@ -5,7 +5,13 @@ from app.models.exam import ExamTemplate
class PromptBuilder:
def build_prompt(self, template: ExamTemplate, topic_prompt: str) -> str:
def build_prompt(
self,
template: ExamTemplate,
topic_prompt: str,
reference_context: str = "",
images_catalog: str = "",
) -> str:
settings = template.settings
difficulty_profile = template.difficulty_profile
safe_topic = sanitize_prompt_input(topic_prompt)
@@ -18,6 +24,7 @@ class PromptBuilder:
"correct_answers": ["respuesta correcta"],
"wrong_answers": ["distractor 1", "distractor 2"],
"matching_pairs": [{"prompt": "concepto", "answer": "definicion"}],
"image_id": "uuid-opcional-de-imagen-de-la-plantilla",
"difficulty": "easy | medium | hard | very_hard",
"score": 1.0,
"penalty": 0.0,
@@ -41,6 +48,20 @@ class PromptBuilder:
"Tema, conceptos y restricciones indicadas por el profesor:",
safe_topic,
"",
*(
[
"Material de referencia (usa SOLO esta información junto con el tema para crear preguntas):",
sanitize_prompt_input(reference_context, max_length=12_000) if reference_context else "",
"",
]
if reference_context.strip()
else []
),
*(
[images_catalog, ""]
if images_catalog.strip()
else []
),
"Contrato de salida obligatorio:",
json.dumps(contract, ensure_ascii=False, indent=2),
"",
@@ -51,5 +72,7 @@ class PromptBuilder:
"- En truefalse, usa una única respuesta correcta: true o false.",
"- En shortanswer, incluye respuestas exactas aceptadas.",
"- En matching, rellena matching_pairs y deja wrong_answers vacío.",
"- Si la pregunta debe mostrar una imagen al alumno, incluye image_id del catálogo de imágenes.",
"- El enunciado debe describir qué observar en la imagen vinculada (sin inventar image_id inexistentes).",
]
)
+81
View File
@@ -0,0 +1,81 @@
import uuid
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.core.config import Settings
from app.core.errors import AppError
from app.models.exam import ExamImage, ExamMaterial
class StorageQuotaService:
def __init__(self, db: Session, settings: Settings) -> None:
self.db = db
self.settings = settings
def get_template_usage_bytes(self, template_id: uuid.UUID) -> int:
materials_bytes = self.db.scalar(
select(func.coalesce(func.sum(ExamMaterial.size_bytes), 0)).where(
ExamMaterial.template_id == template_id
)
)
images_bytes = self.db.scalar(
select(func.coalesce(func.sum(ExamImage.size_bytes), 0)).where(
ExamImage.template_id == template_id
)
)
return int(materials_bytes or 0) + int(images_bytes or 0)
def ensure_template_has_space(self, template_id: uuid.UUID, incoming_bytes: int) -> None:
if incoming_bytes <= 0:
return
limit = self.settings.max_storage_bytes_per_template
used = self.get_template_usage_bytes(template_id)
projected = used + incoming_bytes
if projected > limit:
raise AppError(
message=(
f"Template storage quota exceeded. "
f"Limit: {self._format_mb(limit)}, "
f"used: {self._format_mb(used)}, "
f"file: {self._format_mb(incoming_bytes)}"
),
status_code=413,
code="template_storage_quota_exceeded",
)
def get_usage_summary(self, template_id: uuid.UUID) -> dict[str, int | float]:
materials_bytes = int(
self.db.scalar(
select(func.coalesce(func.sum(ExamMaterial.size_bytes), 0)).where(
ExamMaterial.template_id == template_id
)
)
or 0
)
images_bytes = int(
self.db.scalar(
select(func.coalesce(func.sum(ExamImage.size_bytes), 0)).where(
ExamImage.template_id == template_id
)
)
or 0
)
used = materials_bytes + images_bytes
limit = self.settings.max_storage_bytes_per_template
return {
"template_id": template_id,
"used_bytes": used,
"limit_bytes": limit,
"remaining_bytes": max(limit - used, 0),
"materials_bytes": materials_bytes,
"images_bytes": images_bytes,
"used_mb": round(used / (1024 * 1024), 2),
"limit_mb": round(limit / (1024 * 1024), 2),
}
@staticmethod
def _format_mb(value_bytes: int) -> str:
return f"{value_bytes / (1024 * 1024):.2f} MB"