diff --git a/src/trudex/domain/schemas.py b/src/trudex/domain/schemas.py index c269624..ad55ab9 100644 --- a/src/trudex/domain/schemas.py +++ b/src/trudex/domain/schemas.py @@ -42,3 +42,24 @@ class Option: text: str is_correct: bool = False explanation: str | None = None + + +@dataclass +class TestAttempt: + id: int + user_id: int + test_id: int + started_at: datetime + finished_at: datetime | None = None + score: int = 0 + is_passed: bool = False + + +@dataclass +class UserAnswer: + id: int + attempt_id: int + question_id: int + selected_option_id: int | None = None + text_answer: str | None = None + is_correct: bool = False diff --git a/src/trudex/infrastructure/database/dao/test_attempt.py b/src/trudex/infrastructure/database/dao/test_attempt.py new file mode 100644 index 0000000..551c283 --- /dev/null +++ b/src/trudex/infrastructure/database/dao/test_attempt.py @@ -0,0 +1,80 @@ +from datetime import datetime + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from trudex.domain.schemas import TestAttempt as DomainTestAttempt +from trudex.infrastructure.database.dto.test_attempt import TestAttemptDTO +from trudex.infrastructure.database.models import TestAttempt + + +class TestAttemptDAO: + def __init__(self, session: AsyncSession) -> None: + self.session: AsyncSession = session + + async def get_by_id(self, attempt_id: int) -> DomainTestAttempt | None: + result = await self.session.execute( + select(TestAttempt).where(TestAttempt.id == attempt_id) + ) + model = result.scalar_one_or_none() + return TestAttemptDTO(model).to_domain() if model else None + + async def get_all(self) -> list[DomainTestAttempt]: + result = await self.session.execute(select(TestAttempt)) + models = list(result.scalars().all()) + return [TestAttemptDTO(model).to_domain() for model in models] + + async def create( + self, + user_id: int, + test_id: int, + score: int = 0, + is_passed: bool = False, + ) -> DomainTestAttempt: + attempt = TestAttempt( + user_id=user_id, + test_id=test_id, + score=score, + is_passed=is_passed, + ) + self.session.add(attempt) + await self.session.flush() + await self.session.refresh(attempt) + return TestAttemptDTO(attempt).to_domain() + + async def update( + self, + attempt_id: int, + finished_at: datetime | None = None, + score: int | None = None, + is_passed: bool | None = None, + ) -> DomainTestAttempt | None: + result = await self.session.execute( + select(TestAttempt).where(TestAttempt.id == attempt_id) + ) + attempt = result.scalar_one_or_none() + if not attempt: + return None + + if finished_at is not None: + attempt.finished_at = finished_at + if score is not None: + attempt.score = score + if is_passed is not None: + attempt.is_passed = is_passed + + await self.session.flush() + await self.session.refresh(attempt) + return TestAttemptDTO(attempt).to_domain() + + async def delete(self, attempt_id: int) -> bool: + result = await self.session.execute( + select(TestAttempt).where(TestAttempt.id == attempt_id) + ) + attempt = result.scalar_one_or_none() + if not attempt: + return False + + await self.session.delete(attempt) + await self.session.flush() + return True diff --git a/src/trudex/infrastructure/database/dao/user_answer.py b/src/trudex/infrastructure/database/dao/user_answer.py new file mode 100644 index 0000000..57be605 --- /dev/null +++ b/src/trudex/infrastructure/database/dao/user_answer.py @@ -0,0 +1,80 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from trudex.domain.schemas import UserAnswer as DomainUserAnswer +from trudex.infrastructure.database.dto.user_answer import UserAnswerDTO +from trudex.infrastructure.database.models import UserAnswer + + +class UserAnswerDAO: + def __init__(self, session: AsyncSession) -> None: + self.session: AsyncSession = session + + async def get_by_id(self, answer_id: int) -> DomainUserAnswer | None: + result = await self.session.execute( + select(UserAnswer).where(UserAnswer.id == answer_id) + ) + model = result.scalar_one_or_none() + return UserAnswerDTO(model).to_domain() if model else None + + async def get_all(self) -> list[DomainUserAnswer]: + result = await self.session.execute(select(UserAnswer)) + models = list(result.scalars().all()) + return [UserAnswerDTO(model).to_domain() for model in models] + + async def create( + self, + attempt_id: int, + question_id: int, + selected_option_id: int | None = None, + text_answer: str | None = None, + is_correct: bool = False, + ) -> DomainUserAnswer: + answer = UserAnswer( + attempt_id=attempt_id, + question_id=question_id, + selected_option_id=selected_option_id, + text_answer=text_answer, + is_correct=is_correct, + ) + self.session.add(answer) + await self.session.flush() + await self.session.refresh(answer) + return UserAnswerDTO(answer).to_domain() + + async def update( + self, + answer_id: int, + selected_option_id: int | None = None, + text_answer: str | None = None, + is_correct: bool | None = None, + ) -> DomainUserAnswer | None: + result = await self.session.execute( + select(UserAnswer).where(UserAnswer.id == answer_id) + ) + answer = result.scalar_one_or_none() + if not answer: + return None + + if selected_option_id is not None: + answer.selected_option_id = selected_option_id + if text_answer is not None: + answer.text_answer = text_answer + if is_correct is not None: + answer.is_correct = is_correct + + await self.session.flush() + await self.session.refresh(answer) + return UserAnswerDTO(answer).to_domain() + + async def delete(self, answer_id: int) -> bool: + result = await self.session.execute( + select(UserAnswer).where(UserAnswer.id == answer_id) + ) + answer = result.scalar_one_or_none() + if not answer: + return False + + await self.session.delete(answer) + await self.session.flush() + return True diff --git a/src/trudex/infrastructure/database/dto/test_attempt.py b/src/trudex/infrastructure/database/dto/test_attempt.py new file mode 100644 index 0000000..786eb38 --- /dev/null +++ b/src/trudex/infrastructure/database/dto/test_attempt.py @@ -0,0 +1,18 @@ +from trudex.domain.schemas import TestAttempt as DomainTestAttempt +from trudex.infrastructure.database.models import TestAttempt as TestAttemptModel + + +class TestAttemptDTO: + def __init__(self, model: TestAttemptModel) -> None: + self.model: TestAttemptModel = model + + def to_domain(self) -> DomainTestAttempt: + return DomainTestAttempt( + id=self.model.id, + user_id=self.model.user_id, + test_id=self.model.test_id, + started_at=self.model.started_at, + finished_at=self.model.finished_at, + score=self.model.score, + is_passed=self.model.is_passed, + ) diff --git a/src/trudex/infrastructure/database/dto/user_answer.py b/src/trudex/infrastructure/database/dto/user_answer.py new file mode 100644 index 0000000..58f0ddc --- /dev/null +++ b/src/trudex/infrastructure/database/dto/user_answer.py @@ -0,0 +1,17 @@ +from trudex.domain.schemas import UserAnswer as DomainUserAnswer +from trudex.infrastructure.database.models import UserAnswer as UserAnswerModel + + +class UserAnswerDTO: + def __init__(self, model: UserAnswerModel) -> None: + self.model: UserAnswerModel = model + + def to_domain(self) -> DomainUserAnswer: + return DomainUserAnswer( + id=self.model.id, + attempt_id=self.model.attempt_id, + question_id=self.model.question_id, + selected_option_id=self.model.selected_option_id, + text_answer=self.model.text_answer, + is_correct=self.model.is_correct, + ) diff --git a/src/trudex/infrastructure/database/models.py b/src/trudex/infrastructure/database/models.py index b4e1f29..561c657 100644 --- a/src/trudex/infrastructure/database/models.py +++ b/src/trudex/infrastructure/database/models.py @@ -78,3 +78,39 @@ class Option(Base): explanation: Mapped[str | None] = mapped_column(Text) question: Mapped["Question"] = relationship(back_populates="options") + + +@final +class TestAttempt(Base): + __tablename__ = "test_attempts" + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(BigInteger, index=True) + test_id: Mapped[int] = mapped_column(ForeignKey("tests.id")) + started_at: Mapped[datetime] = mapped_column(server_default=func.now()) + finished_at: Mapped[datetime | None] = mapped_column(default=None) + score: Mapped[int] = mapped_column(Integer, default=0) + is_passed: Mapped[bool] = mapped_column(default=False) + + test: Mapped["Test"] = relationship() + answers: Mapped[list["UserAnswer"]] = relationship( + back_populates="attempt", + cascade="all, delete-orphan" + ) + + +@final +class UserAnswer(Base): + __tablename__ = "user_answers" + + id: Mapped[int] = mapped_column(primary_key=True) + attempt_id: Mapped[int] = mapped_column(ForeignKey("test_attempts.id")) + question_id: Mapped[int] = mapped_column(ForeignKey("questions.id")) + selected_option_id: Mapped[int | None] = mapped_column(ForeignKey("options.id"), default=None) + text_answer: Mapped[str | None] = mapped_column(Text, default=None) + is_correct: Mapped[bool] = mapped_column(default=False) + + attempt: Mapped["TestAttempt"] = relationship(back_populates="answers") + question: Mapped["Question"] = relationship() + selected_option: Mapped["Option | None"] = relationship() + diff --git a/src/trudex/infrastructure/database/repo/__init__.py b/src/trudex/infrastructure/database/repo/__init__.py index 8a2838f..3d25dad 100644 --- a/src/trudex/infrastructure/database/repo/__init__.py +++ b/src/trudex/infrastructure/database/repo/__init__.py @@ -1,4 +1,5 @@ from trudex.infrastructure.database.repo.test import TestRepository +from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository from trudex.infrastructure.database.repo.user import UserRepository -__all__ = ["TestRepository", "UserRepository"] +__all__ = ["TestRepository", "TestAttemptRepository", "UserRepository"] diff --git a/src/trudex/infrastructure/database/repo/test_attempt.py b/src/trudex/infrastructure/database/repo/test_attempt.py new file mode 100644 index 0000000..d6043cb --- /dev/null +++ b/src/trudex/infrastructure/database/repo/test_attempt.py @@ -0,0 +1,197 @@ +from datetime import datetime +from typing import final + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from trudex.domain.schemas import TestAttempt, UserAnswer +from trudex.infrastructure.database.dao.test_attempt import TestAttemptDAO +from trudex.infrastructure.database.dao.user_answer import UserAnswerDAO +from trudex.infrastructure.database.dto.test_attempt import TestAttemptDTO +from trudex.infrastructure.database.dto.user_answer import UserAnswerDTO +from trudex.infrastructure.database.models import ( + TestAttempt as TestAttemptModel, + UserAnswer as UserAnswerModel, +) + + +@final +class TestAttemptRepository: + def __init__(self, session: AsyncSession) -> None: + self.session = session + self.attempt_dao = TestAttemptDAO(session) + self.answer_dao = UserAnswerDAO(session) + + async def get_user_attempts(self, user_id: int) -> list[TestAttempt]: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.user_id == user_id) + .order_by(TestAttemptModel.started_at.desc()) + ) + models = list(result.scalars().all()) + return [TestAttemptDTO(model).to_domain() for model in models] + + async def get_test_attempts(self, test_id: int) -> list[TestAttempt]: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.test_id == test_id) + .order_by(TestAttemptModel.started_at.desc()) + ) + models = list(result.scalars().all()) + return [TestAttemptDTO(model).to_domain() for model in models] + + async def get_user_test_attempts(self, user_id: int, test_id: int) -> list[TestAttempt]: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.user_id == user_id) + .where(TestAttemptModel.test_id == test_id) + .order_by(TestAttemptModel.started_at.desc()) + ) + models = list(result.scalars().all()) + return [TestAttemptDTO(model).to_domain() for model in models] + + async def get_active_attempt(self, user_id: int, test_id: int) -> TestAttempt | None: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.user_id == user_id) + .where(TestAttemptModel.test_id == test_id) + .where(TestAttemptModel.finished_at == None) + .order_by(TestAttemptModel.started_at.desc()) + ) + model = result.scalar_one_or_none() + return TestAttemptDTO(model).to_domain() if model else None + + async def get_attempt_with_answers(self, attempt_id: int) -> tuple[TestAttempt | None, list[UserAnswer]]: + attempt = await self.attempt_dao.get_by_id(attempt_id) + if not attempt: + return None, [] + + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.id == attempt_id) + .options(selectinload(TestAttemptModel.answers)) + ) + attempt_model = result.scalar_one_or_none() + if not attempt_model: + return attempt, [] + + answers = [UserAnswerDTO(answer).to_domain() for answer in attempt_model.answers] + return attempt, answers + + async def get_answers_for_attempt(self, attempt_id: int) -> list[UserAnswer]: + result = await self.session.execute( + select(UserAnswerModel) + .where(UserAnswerModel.attempt_id == attempt_id) + ) + models = list(result.scalars().all()) + return [UserAnswerDTO(model).to_domain() for model in models] + + async def count_user_attempts(self, user_id: int, test_id: int | None = None) -> int: + query = select(func.count(TestAttemptModel.id)).where(TestAttemptModel.user_id == user_id) + if test_id is not None: + query = query.where(TestAttemptModel.test_id == test_id) + + result = await self.session.execute(query) + count = result.scalar_one() + return count + + async def count_passed_attempts(self, user_id: int, test_id: int | None = None) -> int: + query = ( + select(func.count(TestAttemptModel.id)) + .where(TestAttemptModel.user_id == user_id) + .where(TestAttemptModel.is_passed == True) + ) + if test_id is not None: + query = query.where(TestAttemptModel.test_id == test_id) + + result = await self.session.execute(query) + count = result.scalar_one() + return count + + async def get_best_attempt(self, user_id: int, test_id: int) -> TestAttempt | None: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.user_id == user_id) + .where(TestAttemptModel.test_id == test_id) + .where(TestAttemptModel.finished_at != None) + .order_by(TestAttemptModel.score.desc(), TestAttemptModel.started_at.asc()) + ) + model = result.scalar_one_or_none() + return TestAttemptDTO(model).to_domain() if model else None + + async def get_latest_attempt(self, user_id: int, test_id: int) -> TestAttempt | None: + result = await self.session.execute( + select(TestAttemptModel) + .where(TestAttemptModel.user_id == user_id) + .where(TestAttemptModel.test_id == test_id) + .order_by(TestAttemptModel.started_at.desc()) + ) + model = result.scalar_one_or_none() + return TestAttemptDTO(model).to_domain() if model else None + + async def finish_attempt(self, attempt_id: int, score: int, is_passed: bool) -> TestAttempt | None: + return await self.attempt_dao.update( + attempt_id=attempt_id, + finished_at=datetime.utcnow(), + score=score, + is_passed=is_passed + ) + + async def calculate_attempt_score(self, attempt_id: int) -> int: + result = await self.session.execute( + select(func.count(UserAnswerModel.id)) + .where(UserAnswerModel.attempt_id == attempt_id) + .where(UserAnswerModel.is_correct == True) + ) + count = result.scalar_one() + return count + + async def get_incorrect_answers(self, attempt_id: int) -> list[UserAnswer]: + result = await self.session.execute( + select(UserAnswerModel) + .where(UserAnswerModel.attempt_id == attempt_id) + .where(UserAnswerModel.is_correct == False) + ) + models = list(result.scalars().all()) + return [UserAnswerDTO(model).to_domain() for model in models] + + async def get_question_statistics(self, question_id: int) -> dict[str, int]: + total_result = await self.session.execute( + select(func.count(UserAnswerModel.id)) + .where(UserAnswerModel.question_id == question_id) + ) + total = total_result.scalar_one() + + correct_result = await self.session.execute( + select(func.count(UserAnswerModel.id)) + .where(UserAnswerModel.question_id == question_id) + .where(UserAnswerModel.is_correct == True) + ) + correct = correct_result.scalar_one() + + return { + "total_answers": total, + "correct_answers": correct, + "incorrect_answers": total - correct, + } + + async def get_most_difficult_questions(self, test_id: int, limit: int = 10) -> list[tuple[int, float]]: + from trudex.infrastructure.database.models import Question as QuestionModel + + result = await self.session.execute( + select( + UserAnswerModel.question_id, + func.count(UserAnswerModel.id).label("total"), + func.sum(func.cast(UserAnswerModel.is_correct, func.Integer)).label("correct") + ) + .join(QuestionModel, UserAnswerModel.question_id == QuestionModel.id) + .where(QuestionModel.test_id == test_id) + .group_by(UserAnswerModel.question_id) + .having(func.count(UserAnswerModel.id) > 0) + .order_by((func.sum(func.cast(UserAnswerModel.is_correct, func.Integer)) / func.count(UserAnswerModel.id)).asc()) + .limit(limit) + ) + + rows = result.all() + return [(row.question_id, row.correct / row.total if row.total > 0 else 0.0) for row in rows] diff --git a/src/trudex/infrastructure/di.py b/src/trudex/infrastructure/di.py index 0536352..a6eb442 100644 --- a/src/trudex/infrastructure/di.py +++ b/src/trudex/infrastructure/di.py @@ -7,8 +7,11 @@ from trudex.infrastructure.database.config import new_session_maker from trudex.infrastructure.database.dao.option import OptionDAO from trudex.infrastructure.database.dao.question import QuestionDAO from trudex.infrastructure.database.dao.test import TestDAO +from trudex.infrastructure.database.dao.test_attempt import TestAttemptDAO from trudex.infrastructure.database.dao.user import UserDAO +from trudex.infrastructure.database.dao.user_answer import UserAnswerDAO from trudex.infrastructure.database.repo.test import TestRepository +from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository from trudex.infrastructure.database.repo.user import UserRepository from trudex.infrastructure.utils.config import Config @@ -45,6 +48,14 @@ class DatabaseProvider(Provider): def get_option_dao(self, session: AsyncSession) -> OptionDAO: return OptionDAO(session) + @provide(scope=Scope.REQUEST) + def get_test_attempt_dao(self, session: AsyncSession) -> TestAttemptDAO: + return TestAttemptDAO(session) + + @provide(scope=Scope.REQUEST) + def get_user_answer_dao(self, session: AsyncSession) -> UserAnswerDAO: + return UserAnswerDAO(session) + @provide(scope=Scope.REQUEST) def get_user_repository(self, session: AsyncSession) -> UserRepository: return UserRepository(session) @@ -52,3 +63,7 @@ class DatabaseProvider(Provider): @provide(scope=Scope.REQUEST) def get_test_repository(self, session: AsyncSession) -> TestRepository: return TestRepository(session) + + @provide(scope=Scope.REQUEST) + def get_test_attempt_repository(self, session: AsyncSession) -> TestAttemptRepository: + return TestAttemptRepository(session)