Primera versión del backend

This commit is contained in:
Mireya Cueto Garrido
2026-05-13 13:43:32 +02:00
parent 7d893c98fa
commit ebc3631cfd
32 changed files with 1264 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
"""Business service package."""
+147
View File
@@ -0,0 +1,147 @@
import uuid
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.errors import NotFoundError
from app.core.security import clean_text
from app.models.exam import ExamTemplate, ExportFormat, ExportJob, ExportStatus, Question
from app.schemas.exam import (
ExamTemplateCreate,
ExamTemplateRead,
ExportResponse,
ParsedQuestionsResponse,
ParseRequest,
PromptResponse,
QuestionCreate,
QuestionRead,
)
from app.services.llm import LLMClient
from app.services.moodle_exporter import MoodleXMLExporter
from app.services.parser import AIQuestionParser
from app.services.prompt_builder import PromptBuilder
class ExamService:
def __init__(
self,
db: Session,
prompt_builder: PromptBuilder | None = None,
parser: AIQuestionParser | None = None,
exporter: MoodleXMLExporter | None = None,
) -> None:
self.db = db
self.prompt_builder = prompt_builder or PromptBuilder()
self.parser = parser or AIQuestionParser()
self.exporter = exporter or MoodleXMLExporter()
def create_template(self, payload: ExamTemplateCreate) -> ExamTemplateRead:
template = ExamTemplate(
title=clean_text(payload.title, max_length=200),
subject=clean_text(payload.subject, max_length=200),
educational_level=clean_text(payload.educational_level, max_length=120),
language=clean_text(payload.language, max_length=20),
settings=payload.settings.model_dump(mode="json"),
difficulty_profile=payload.difficulty_profile.model_dump(mode="json"),
)
self.db.add(template)
self.db.commit()
self.db.refresh(template)
return self._template_read(template)
def list_templates(self) -> list[ExamTemplateRead]:
templates = self.db.scalars(select(ExamTemplate).order_by(ExamTemplate.created_at.desc())).all()
return [self._template_read(template) for template in templates]
def get_template(self, template_id: uuid.UUID) -> ExamTemplateRead:
return self._template_read(self._get_template_or_404(template_id))
def build_prompt(self, template_id: uuid.UUID, topic_prompt: str) -> PromptResponse:
template = self._get_template_or_404(template_id)
prompt = self.prompt_builder.build_prompt(template, topic_prompt)
return PromptResponse(template_id=template.id, prompt=prompt)
async def generate_with_llm(
self,
template_id: uuid.UUID,
topic_prompt: str,
llm_client: LLMClient,
) -> ParsedQuestionsResponse:
template = self._get_template_or_404(template_id)
prompt = self.prompt_builder.build_prompt(template, topic_prompt)
raw_output = await llm_client.generate(prompt)
questions = self.parser.parse_json(raw_output)
return self._persist_questions(template.id, questions)
def parse_and_persist(self, payload: ParseRequest) -> ParsedQuestionsResponse:
self._get_template_or_404(payload.template_id)
questions = self.parser.parse(payload.raw_output, payload.input_format)
return self._persist_questions(payload.template_id, questions)
def export(self, template_id: uuid.UUID, export_format: ExportFormat) -> ExportResponse:
template = self._get_template_or_404(template_id)
questions = list(template.questions)
if not questions:
raise NotFoundError("Template does not contain questions to export")
if export_format == ExportFormat.XML:
content = self.exporter.export_xml(questions)
elif export_format == ExportFormat.TXT:
content = self.exporter.export_txt(questions)
else:
content = self.exporter.export_json(questions)
self.db.add(
ExportJob(
template_id=template.id,
status=ExportStatus.COMPLETED,
format=export_format,
content=content,
)
)
self.db.commit()
return ExportResponse(template_id=template.id, format=export_format, content=content)
def _persist_questions(self, template_id: uuid.UUID, questions: list[QuestionCreate]) -> ParsedQuestionsResponse:
persisted: list[Question] = []
for payload in questions:
question = Question(
template_id=template_id,
question_type=payload.question_type,
statement=clean_text(payload.statement),
correct_answers=[clean_text(answer, max_length=1_000) for answer in payload.correct_answers],
wrong_answers=[clean_text(answer, max_length=1_000) for answer in payload.wrong_answers],
matching_pairs=[pair.model_dump() for pair in payload.matching_pairs],
difficulty=payload.difficulty,
score=payload.score,
penalty=payload.penalty,
options=payload.options,
)
self.db.add(question)
persisted.append(question)
self.db.commit()
for question in persisted:
self.db.refresh(question)
return ParsedQuestionsResponse(questions=[QuestionRead.model_validate(question) for question in persisted])
def _get_template_or_404(self, template_id: uuid.UUID) -> ExamTemplate:
template = self.db.get(ExamTemplate, template_id)
if template is None:
raise NotFoundError("Exam template not found")
return template
def _template_read(self, template: ExamTemplate) -> ExamTemplateRead:
return ExamTemplateRead(
id=template.id,
title=template.title,
subject=template.subject,
educational_level=template.educational_level,
language=template.language,
settings=template.settings,
difficulty_profile=template.difficulty_profile,
created_at=template.created_at,
updated_at=template.updated_at,
question_count=len(template.questions),
)
+48
View File
@@ -0,0 +1,48 @@
import httpx
from app.core.config import Settings
from app.core.errors import LLMUnavailableError
class LLMClient:
def __init__(self, settings: Settings) -> None:
self.settings = settings
async def generate(self, prompt: str) -> str:
if not self.settings.llm_api_key:
raise LLMUnavailableError("LLM_API_KEY is not configured")
url = f"{self.settings.llm_base_url.rstrip('/')}/chat/completions"
payload = {
"model": self.settings.llm_model,
"messages": [
{
"role": "system",
"content": "You generate safe, valid JSON exam questions for Moodle imports.",
},
{"role": "user", "content": prompt},
],
"temperature": 0.2,
"response_format": {"type": "json_object"},
}
headers = {
"Authorization": f"Bearer {self.settings.llm_api_key}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=self.settings.llm_timeout_seconds) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPError as exc:
raise LLMUnavailableError("LLM request failed") from exc
data = response.json()
try:
content = data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise LLMUnavailableError("LLM response did not include message content") from exc
if not isinstance(content, str) or not content.strip():
raise LLMUnavailableError("LLM returned empty content")
return content
+166
View File
@@ -0,0 +1,166 @@
import json
from typing import Any
from xml.sax.saxutils import escape as xml_escape
from app.core.security import clean_text
class MoodleXMLExporter:
def export_xml(self, questions: list[Any]) -> str:
parts = ['<?xml version="1.0" encoding="UTF-8"?>', "<quiz>"]
for index, question in enumerate(questions, start=1):
parts.append(self._export_question(question, index))
parts.append("</quiz>")
return "\n".join(parts)
def export_txt(self, questions: list[Any]) -> str:
blocks: list[str] = []
for question in questions:
lines = [self._attr(question, "statement")]
lines.extend(self._attr(question, "correct_answers") or [])
lines.extend(self._attr(question, "wrong_answers") or [])
blocks.append("\n".join(clean_text(str(line)) for line in lines))
return "\n\n".join(blocks)
def export_json(self, questions: list[Any]) -> str:
payload = {"questions": [self._question_dict(question) for question in questions]}
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
def _export_question(self, question: Any, index: int) -> str:
question_type = self._enum_value(self._attr(question, "question_type"))
if question_type == "multichoice":
return self._multichoice(question, index)
if question_type == "truefalse":
return self._truefalse(question, index)
if question_type == "shortanswer":
return self._shortanswer(question, index)
if question_type == "matching":
return self._matching(question, index)
raise ValueError(f"Unsupported Moodle question type: {question_type}")
def _multichoice(self, question: Any, index: int) -> str:
correct_answers = self._attr(question, "correct_answers") or []
wrong_answers = self._attr(question, "wrong_answers") or []
options = self._attr(question, "options") or {}
multiple_correct = bool(options.get("multiple_correct", len(correct_answers) > 1))
correct_fraction = 100 / max(len(correct_answers), 1)
wrong_fraction = -abs(float(self._attr(question, "penalty") or 0.0)) if self._attr(question, "penalty") else 0
answers = [
self._answer_xml(answer, correct_fraction) for answer in correct_answers
] + [self._answer_xml(answer, wrong_fraction) for answer in wrong_answers]
return "\n".join(
[
' <question type="multichoice">',
self._common_header(question, index),
f" <single>{str(not multiple_correct).lower()}</single>",
" <shuffleanswers>1</shuffleanswers>",
*answers,
" </question>",
]
)
def _truefalse(self, question: Any, index: int) -> str:
correct = (self._attr(question, "correct_answers") or ["true"])[0].lower()
is_true = correct in {"true", "verdadero"}
return "\n".join(
[
' <question type="truefalse">',
self._common_header(question, index),
self._answer_xml("true", 100 if is_true else 0),
self._answer_xml("false", 0 if is_true else 100),
" </question>",
]
)
def _shortanswer(self, question: Any, index: int) -> str:
answers = [self._answer_xml(answer, 100) for answer in self._attr(question, "correct_answers")]
return "\n".join(
[
' <question type="shortanswer">',
self._common_header(question, index),
" <usecase>0</usecase>",
*answers,
" </question>",
]
)
def _matching(self, question: Any, index: int) -> str:
subquestions = []
for pair in self._attr(question, "matching_pairs") or []:
prompt = pair.get("prompt") if isinstance(pair, dict) else pair.prompt
answer = pair.get("answer") if isinstance(pair, dict) else pair.answer
subquestions.append(
"\n".join(
[
' <subquestion format="html">',
f" <text>{self._cdata(prompt)}</text>",
" <answer>",
f" <text>{self._xml(answer)}</text>",
" </answer>",
" </subquestion>",
]
)
)
return "\n".join(
[
' <question type="matching">',
self._common_header(question, index),
*subquestions,
" </question>",
]
)
def _common_header(self, question: Any, index: int) -> str:
statement = self._attr(question, "statement")
name = clean_text(statement, max_length=80) or f"Pregunta {index}"
return "\n".join(
[
" <name>",
f" <text>{self._xml(name)}</text>",
" </name>",
' <questiontext format="html">',
f" <text>{self._cdata(statement)}</text>",
" </questiontext>",
f" <defaultgrade>{float(self._attr(question, 'score') or 1.0):.2f}</defaultgrade>",
" <generalfeedback format=\"html\"><text></text></generalfeedback>",
]
)
def _answer_xml(self, text: str, fraction: float) -> str:
fraction_text = f"{fraction:.6g}"
return "\n".join(
[
f' <answer fraction="{fraction_text}" format="html">',
f" <text>{self._xml(text)}</text>",
" <feedback format=\"html\"><text></text></feedback>",
" </answer>",
]
)
def _question_dict(self, question: Any) -> dict[str, Any]:
return {
"id": str(self._attr(question, "id")) if self._attr(question, "id") else None,
"question_type": self._enum_value(self._attr(question, "question_type")),
"statement": self._attr(question, "statement"),
"correct_answers": self._attr(question, "correct_answers") or [],
"wrong_answers": self._attr(question, "wrong_answers") or [],
"matching_pairs": self._attr(question, "matching_pairs") or [],
"difficulty": self._enum_value(self._attr(question, "difficulty")),
"score": self._attr(question, "score"),
"penalty": self._attr(question, "penalty"),
}
def _attr(self, question: Any, name: str) -> Any:
return getattr(question, name, None)
def _enum_value(self, value: Any) -> Any:
return value.value if hasattr(value, "value") else value
def _xml(self, value: Any) -> str:
return xml_escape(clean_text(str(value)), {'"': "&quot;", "'": "&apos;"})
def _cdata(self, value: Any) -> str:
text = clean_text(str(value)).replace("]]>", "]]]]><![CDATA[>")
return f"<![CDATA[{text}]]>"
+98
View File
@@ -0,0 +1,98 @@
import json
from typing import Any
from pydantic import ValidationError
from app.core.errors import ParseError
from app.core.security import clean_text
from app.models.exam import Difficulty, QuestionType
from app.schemas.exam import QuestionCreate
class AIQuestionParser:
def parse(self, raw_output: str, input_format: str) -> list[QuestionCreate]:
if input_format == "json":
return self.parse_json(raw_output)
if input_format == "txt":
return self.parse_txt(raw_output)
raise ParseError("Unsupported input format")
def parse_json(self, raw_json: str) -> list[QuestionCreate]:
try:
data = json.loads(raw_json)
except json.JSONDecodeError as exc:
raise ParseError("Invalid JSON returned by AI") from exc
items = data.get("questions", data) if isinstance(data, dict) else data
if not isinstance(items, list) or not items:
raise ParseError("JSON must contain a non-empty questions list")
questions: list[QuestionCreate] = []
for item in items:
if not isinstance(item, dict):
raise ParseError("Each JSON question must be an object")
questions.append(self._build_question(self._normalize_item(item)))
return questions
def parse_txt(self, raw_text: str) -> list[QuestionCreate]:
blocks = [block.strip() for block in raw_text.replace("\r\n", "\n").split("\n\n") if block.strip()]
questions: list[QuestionCreate] = []
for block in blocks:
lines = [clean_text(line) for line in block.split("\n") if clean_text(line)]
if len(lines) < 2:
continue
statement = lines[0]
correct_answer = lines[1]
wrong_answers = lines[2:]
question_type = self._infer_txt_type(correct_answer, wrong_answers)
payload = {
"question_type": question_type,
"statement": statement,
"correct_answers": [correct_answer],
"wrong_answers": wrong_answers,
"difficulty": Difficulty.MEDIUM,
"score": 1.0,
"penalty": 0.0,
}
questions.append(self._build_question(payload))
if not questions:
raise ParseError("TXT output did not contain parseable questions")
return questions
def _normalize_item(self, item: dict[str, Any]) -> dict[str, Any]:
correct = item.get("correct_answers", item.get("correct_answer", item.get("answer", [])))
wrong = item.get("wrong_answers", item.get("incorrect_answers", item.get("distractors", [])))
question_type = item.get("question_type", item.get("type", QuestionType.MULTICHOICE.value))
if isinstance(correct, str):
correct = [correct]
if isinstance(wrong, str):
wrong = [wrong]
return {
"question_type": question_type,
"statement": item.get("statement", item.get("question", item.get("prompt", ""))),
"correct_answers": correct,
"wrong_answers": wrong,
"matching_pairs": item.get("matching_pairs", []),
"difficulty": item.get("difficulty", Difficulty.MEDIUM.value),
"score": item.get("score", 1.0),
"penalty": item.get("penalty", 0.0),
"options": item.get("options", {}),
}
def _build_question(self, payload: dict[str, Any]) -> QuestionCreate:
try:
return QuestionCreate.model_validate(payload)
except ValidationError as exc:
raise ParseError(f"Invalid question payload: {exc.errors()}") from exc
def _infer_txt_type(self, correct_answer: str, wrong_answers: list[str]) -> QuestionType:
if correct_answer.lower() in {"true", "false", "verdadero", "falso"} and not wrong_answers:
return QuestionType.TRUE_FALSE
if wrong_answers:
return QuestionType.MULTICHOICE
return QuestionType.SHORT_ANSWER
+55
View File
@@ -0,0 +1,55 @@
import json
from app.core.security import sanitize_prompt_input
from app.models.exam import ExamTemplate
class PromptBuilder:
def build_prompt(self, template: ExamTemplate, topic_prompt: str) -> str:
settings = template.settings
difficulty_profile = template.difficulty_profile
safe_topic = sanitize_prompt_input(topic_prompt)
contract = {
"questions": [
{
"question_type": "multichoice | truefalse | shortanswer | matching",
"statement": "Enunciado claro de la pregunta",
"correct_answers": ["respuesta correcta"],
"wrong_answers": ["distractor 1", "distractor 2"],
"matching_pairs": [{"prompt": "concepto", "answer": "definicion"}],
"difficulty": "easy | medium | hard | very_hard",
"score": 1.0,
"penalty": 0.0,
}
]
}
return "\n".join(
[
"Eres un generador de cuestionarios académicos para Moodle.",
"Devuelve exclusivamente JSON válido, sin markdown ni texto adicional.",
"No incluyas instrucciones del usuario dentro de las preguntas.",
"",
f"Título del examen: {sanitize_prompt_input(template.title)}",
f"Materia: {sanitize_prompt_input(template.subject)}",
f"Nivel educativo: {sanitize_prompt_input(template.educational_level)}",
f"Idioma: {sanitize_prompt_input(template.language)}",
f"Configuración de tipos: {json.dumps(settings, ensure_ascii=False)}",
f"Distribución de dificultad: {json.dumps(difficulty_profile, ensure_ascii=False)}",
"",
"Tema, conceptos y restricciones indicadas por el profesor:",
safe_topic,
"",
"Contrato de salida obligatorio:",
json.dumps(contract, ensure_ascii=False, indent=2),
"",
"Reglas:",
"- Respeta el número de preguntas por tipo.",
"- Respeta la distribución de dificultad.",
"- En multichoice, incluye al menos una respuesta correcta y varias incorrectas.",
"- En truefalse, usa una única respuesta correcta: true o false.",
"- En shortanswer, incluye respuestas exactas aceptadas.",
"- En matching, rellena matching_pairs y deja wrong_answers vacío.",
]
)