Files
GenExam-IA/backend/app/services/auth_service.py
T
Mireya Cueto Garrido 946f16a633 Add React frontend and Sinbad2IA LLM integration.
Introduce a full Vite/React UI for exams, auth, materials, images, generation, and export.
Adapt backend for Sinbad2IA chat API, bcrypt passwords, CORS on port 5173, and schema migrations.
2026-06-01 13:27:41 +02:00

141 lines
5.1 KiB
Python

import uuid
from datetime import UTC, datetime, timedelta
from typing import Annotated
from fastapi import Depends
from jose import JWTError, jwt
import bcrypt
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.config import Settings, get_settings
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token as google_id_token
from app.core.errors import AppError, ConflictError, NotFoundError, UnauthorizedError
from app.core.security import clean_text
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import UserLogin, UserRead, UserRegister
def _hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def _verify_password(password: str, password_hash: str) -> bool:
try:
return bcrypt.checkpw(password.encode("utf-8"), password_hash.encode("utf-8"))
except ValueError:
return False
class AuthService:
def __init__(self, db: Session, settings: Settings) -> None:
self.db = db
self.settings = settings
def register(self, payload: UserRegister) -> UserRead:
email = payload.email.lower().strip()
existing = self.db.scalar(select(User).where(User.email == email))
if existing is not None:
raise ConflictError("Email is already registered")
user = User(
email=email,
password_hash=_hash_password(payload.password),
full_name=clean_text(payload.full_name, max_length=200) if payload.full_name else None,
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return UserRead.model_validate(user)
def authenticate(self, payload: UserLogin) -> User:
email = payload.email.lower().strip()
user = self.db.scalar(select(User).where(User.email == email))
if user is None or user.password_hash is None:
raise UnauthorizedError("Invalid email or password")
if not _verify_password(payload.password, user.password_hash):
raise UnauthorizedError("Invalid email or password")
return user
def login_with_google(self, id_token_value: str) -> User:
if not self.settings.google_client_id:
raise AppError(
message="Google login is not configured",
status_code=503,
code="google_not_configured",
)
try:
idinfo = google_id_token.verify_oauth2_token(
id_token_value,
google_requests.Request(),
self.settings.google_client_id,
)
except ValueError as exc:
raise UnauthorizedError("Invalid Google ID token") from exc
google_sub = idinfo.get("sub")
email = (idinfo.get("email") or "").lower().strip()
if not google_sub or not email:
raise UnauthorizedError("Google token does not include required user information")
if not idinfo.get("email_verified", False):
raise UnauthorizedError("Google email is not verified")
user = self.db.scalar(select(User).where(User.google_sub == google_sub))
if user is not None:
return user
user = self.db.scalar(select(User).where(User.email == email))
if user is not None:
if user.google_sub is not None and user.google_sub != google_sub:
raise ConflictError("Email is linked to another Google account")
user.google_sub = google_sub
if not user.full_name and idinfo.get("name"):
user.full_name = clean_text(idinfo["name"], max_length=200)
self.db.commit()
self.db.refresh(user)
return user
user = User(
email=email,
password_hash=None,
google_sub=google_sub,
full_name=clean_text(idinfo["name"], max_length=200) if idinfo.get("name") else None,
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def get_user_by_id(self, user_id: uuid.UUID) -> User:
user = self.db.get(User, user_id)
if user is None:
raise NotFoundError("User not found")
return user
def create_access_token(self, user_id: uuid.UUID) -> str:
expire = datetime.now(UTC) + timedelta(minutes=self.settings.jwt_expire_minutes)
payload = {"sub": str(user_id), "exp": expire}
return jwt.encode(payload, self.settings.jwt_secret_key, algorithm=self.settings.jwt_algorithm)
def decode_user_id(self, token: str) -> uuid.UUID:
try:
payload = jwt.decode(
token,
self.settings.jwt_secret_key,
algorithms=[self.settings.jwt_algorithm],
)
user_id = uuid.UUID(payload["sub"])
except (JWTError, KeyError, ValueError) as exc:
raise UnauthorizedError("Invalid or expired token") from exc
return user_id
def get_auth_service(
db: Annotated[Session, Depends(get_db)],
settings: Annotated[Settings, Depends(get_settings)],
) -> AuthService:
return AuthService(db, settings)