mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 18:35:28 +03:00
Initial commit
This commit is contained in:
@@ -42,3 +42,24 @@ class Option:
|
|||||||
text: str
|
text: str
|
||||||
is_correct: bool = False
|
is_correct: bool = False
|
||||||
explanation: str | None = None
|
explanation: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestAttempt:
|
||||||
|
id: int
|
||||||
|
user_id: int
|
||||||
|
test_id: int
|
||||||
|
started_at: datetime
|
||||||
|
finished_at: datetime | None = None
|
||||||
|
score: int = 0
|
||||||
|
is_passed: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UserAnswer:
|
||||||
|
id: int
|
||||||
|
attempt_id: int
|
||||||
|
question_id: int
|
||||||
|
selected_option_id: int | None = None
|
||||||
|
text_answer: str | None = None
|
||||||
|
is_correct: bool = False
|
||||||
|
|||||||
@@ -0,0 +1,80 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from trudex.domain.schemas import TestAttempt as DomainTestAttempt
|
||||||
|
from trudex.infrastructure.database.dto.test_attempt import TestAttemptDTO
|
||||||
|
from trudex.infrastructure.database.models import TestAttempt
|
||||||
|
|
||||||
|
|
||||||
|
class TestAttemptDAO:
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
self.session: AsyncSession = session
|
||||||
|
|
||||||
|
async def get_by_id(self, attempt_id: int) -> DomainTestAttempt | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttempt).where(TestAttempt.id == attempt_id)
|
||||||
|
)
|
||||||
|
model = result.scalar_one_or_none()
|
||||||
|
return TestAttemptDTO(model).to_domain() if model else None
|
||||||
|
|
||||||
|
async def get_all(self) -> list[DomainTestAttempt]:
|
||||||
|
result = await self.session.execute(select(TestAttempt))
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [TestAttemptDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def create(
|
||||||
|
self,
|
||||||
|
user_id: int,
|
||||||
|
test_id: int,
|
||||||
|
score: int = 0,
|
||||||
|
is_passed: bool = False,
|
||||||
|
) -> DomainTestAttempt:
|
||||||
|
attempt = TestAttempt(
|
||||||
|
user_id=user_id,
|
||||||
|
test_id=test_id,
|
||||||
|
score=score,
|
||||||
|
is_passed=is_passed,
|
||||||
|
)
|
||||||
|
self.session.add(attempt)
|
||||||
|
await self.session.flush()
|
||||||
|
await self.session.refresh(attempt)
|
||||||
|
return TestAttemptDTO(attempt).to_domain()
|
||||||
|
|
||||||
|
async def update(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
finished_at: datetime | None = None,
|
||||||
|
score: int | None = None,
|
||||||
|
is_passed: bool | None = None,
|
||||||
|
) -> DomainTestAttempt | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttempt).where(TestAttempt.id == attempt_id)
|
||||||
|
)
|
||||||
|
attempt = result.scalar_one_or_none()
|
||||||
|
if not attempt:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if finished_at is not None:
|
||||||
|
attempt.finished_at = finished_at
|
||||||
|
if score is not None:
|
||||||
|
attempt.score = score
|
||||||
|
if is_passed is not None:
|
||||||
|
attempt.is_passed = is_passed
|
||||||
|
|
||||||
|
await self.session.flush()
|
||||||
|
await self.session.refresh(attempt)
|
||||||
|
return TestAttemptDTO(attempt).to_domain()
|
||||||
|
|
||||||
|
async def delete(self, attempt_id: int) -> bool:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttempt).where(TestAttempt.id == attempt_id)
|
||||||
|
)
|
||||||
|
attempt = result.scalar_one_or_none()
|
||||||
|
if not attempt:
|
||||||
|
return False
|
||||||
|
|
||||||
|
await self.session.delete(attempt)
|
||||||
|
await self.session.flush()
|
||||||
|
return True
|
||||||
@@ -0,0 +1,80 @@
|
|||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from trudex.domain.schemas import UserAnswer as DomainUserAnswer
|
||||||
|
from trudex.infrastructure.database.dto.user_answer import UserAnswerDTO
|
||||||
|
from trudex.infrastructure.database.models import UserAnswer
|
||||||
|
|
||||||
|
|
||||||
|
class UserAnswerDAO:
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
self.session: AsyncSession = session
|
||||||
|
|
||||||
|
async def get_by_id(self, answer_id: int) -> DomainUserAnswer | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserAnswer).where(UserAnswer.id == answer_id)
|
||||||
|
)
|
||||||
|
model = result.scalar_one_or_none()
|
||||||
|
return UserAnswerDTO(model).to_domain() if model else None
|
||||||
|
|
||||||
|
async def get_all(self) -> list[DomainUserAnswer]:
|
||||||
|
result = await self.session.execute(select(UserAnswer))
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserAnswerDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def create(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
question_id: int,
|
||||||
|
selected_option_id: int | None = None,
|
||||||
|
text_answer: str | None = None,
|
||||||
|
is_correct: bool = False,
|
||||||
|
) -> DomainUserAnswer:
|
||||||
|
answer = UserAnswer(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
question_id=question_id,
|
||||||
|
selected_option_id=selected_option_id,
|
||||||
|
text_answer=text_answer,
|
||||||
|
is_correct=is_correct,
|
||||||
|
)
|
||||||
|
self.session.add(answer)
|
||||||
|
await self.session.flush()
|
||||||
|
await self.session.refresh(answer)
|
||||||
|
return UserAnswerDTO(answer).to_domain()
|
||||||
|
|
||||||
|
async def update(
|
||||||
|
self,
|
||||||
|
answer_id: int,
|
||||||
|
selected_option_id: int | None = None,
|
||||||
|
text_answer: str | None = None,
|
||||||
|
is_correct: bool | None = None,
|
||||||
|
) -> DomainUserAnswer | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserAnswer).where(UserAnswer.id == answer_id)
|
||||||
|
)
|
||||||
|
answer = result.scalar_one_or_none()
|
||||||
|
if not answer:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if selected_option_id is not None:
|
||||||
|
answer.selected_option_id = selected_option_id
|
||||||
|
if text_answer is not None:
|
||||||
|
answer.text_answer = text_answer
|
||||||
|
if is_correct is not None:
|
||||||
|
answer.is_correct = is_correct
|
||||||
|
|
||||||
|
await self.session.flush()
|
||||||
|
await self.session.refresh(answer)
|
||||||
|
return UserAnswerDTO(answer).to_domain()
|
||||||
|
|
||||||
|
async def delete(self, answer_id: int) -> bool:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserAnswer).where(UserAnswer.id == answer_id)
|
||||||
|
)
|
||||||
|
answer = result.scalar_one_or_none()
|
||||||
|
if not answer:
|
||||||
|
return False
|
||||||
|
|
||||||
|
await self.session.delete(answer)
|
||||||
|
await self.session.flush()
|
||||||
|
return True
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
from trudex.domain.schemas import TestAttempt as DomainTestAttempt
|
||||||
|
from trudex.infrastructure.database.models import TestAttempt as TestAttemptModel
|
||||||
|
|
||||||
|
|
||||||
|
class TestAttemptDTO:
|
||||||
|
def __init__(self, model: TestAttemptModel) -> None:
|
||||||
|
self.model: TestAttemptModel = model
|
||||||
|
|
||||||
|
def to_domain(self) -> DomainTestAttempt:
|
||||||
|
return DomainTestAttempt(
|
||||||
|
id=self.model.id,
|
||||||
|
user_id=self.model.user_id,
|
||||||
|
test_id=self.model.test_id,
|
||||||
|
started_at=self.model.started_at,
|
||||||
|
finished_at=self.model.finished_at,
|
||||||
|
score=self.model.score,
|
||||||
|
is_passed=self.model.is_passed,
|
||||||
|
)
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
from trudex.domain.schemas import UserAnswer as DomainUserAnswer
|
||||||
|
from trudex.infrastructure.database.models import UserAnswer as UserAnswerModel
|
||||||
|
|
||||||
|
|
||||||
|
class UserAnswerDTO:
|
||||||
|
def __init__(self, model: UserAnswerModel) -> None:
|
||||||
|
self.model: UserAnswerModel = model
|
||||||
|
|
||||||
|
def to_domain(self) -> DomainUserAnswer:
|
||||||
|
return DomainUserAnswer(
|
||||||
|
id=self.model.id,
|
||||||
|
attempt_id=self.model.attempt_id,
|
||||||
|
question_id=self.model.question_id,
|
||||||
|
selected_option_id=self.model.selected_option_id,
|
||||||
|
text_answer=self.model.text_answer,
|
||||||
|
is_correct=self.model.is_correct,
|
||||||
|
)
|
||||||
@@ -78,3 +78,39 @@ class Option(Base):
|
|||||||
explanation: Mapped[str | None] = mapped_column(Text)
|
explanation: Mapped[str | None] = mapped_column(Text)
|
||||||
|
|
||||||
question: Mapped["Question"] = relationship(back_populates="options")
|
question: Mapped["Question"] = relationship(back_populates="options")
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class TestAttempt(Base):
|
||||||
|
__tablename__ = "test_attempts"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
user_id: Mapped[int] = mapped_column(BigInteger, index=True)
|
||||||
|
test_id: Mapped[int] = mapped_column(ForeignKey("tests.id"))
|
||||||
|
started_at: Mapped[datetime] = mapped_column(server_default=func.now())
|
||||||
|
finished_at: Mapped[datetime | None] = mapped_column(default=None)
|
||||||
|
score: Mapped[int] = mapped_column(Integer, default=0)
|
||||||
|
is_passed: Mapped[bool] = mapped_column(default=False)
|
||||||
|
|
||||||
|
test: Mapped["Test"] = relationship()
|
||||||
|
answers: Mapped[list["UserAnswer"]] = relationship(
|
||||||
|
back_populates="attempt",
|
||||||
|
cascade="all, delete-orphan"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class UserAnswer(Base):
|
||||||
|
__tablename__ = "user_answers"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
attempt_id: Mapped[int] = mapped_column(ForeignKey("test_attempts.id"))
|
||||||
|
question_id: Mapped[int] = mapped_column(ForeignKey("questions.id"))
|
||||||
|
selected_option_id: Mapped[int | None] = mapped_column(ForeignKey("options.id"), default=None)
|
||||||
|
text_answer: Mapped[str | None] = mapped_column(Text, default=None)
|
||||||
|
is_correct: Mapped[bool] = mapped_column(default=False)
|
||||||
|
|
||||||
|
attempt: Mapped["TestAttempt"] = relationship(back_populates="answers")
|
||||||
|
question: Mapped["Question"] = relationship()
|
||||||
|
selected_option: Mapped["Option | None"] = relationship()
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from trudex.infrastructure.database.repo.test import TestRepository
|
from trudex.infrastructure.database.repo.test import TestRepository
|
||||||
|
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
from trudex.infrastructure.database.repo.user import UserRepository
|
from trudex.infrastructure.database.repo.user import UserRepository
|
||||||
|
|
||||||
__all__ = ["TestRepository", "UserRepository"]
|
__all__ = ["TestRepository", "TestAttemptRepository", "UserRepository"]
|
||||||
|
|||||||
@@ -0,0 +1,197 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from typing import final
|
||||||
|
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
|
from trudex.domain.schemas import TestAttempt, UserAnswer
|
||||||
|
from trudex.infrastructure.database.dao.test_attempt import TestAttemptDAO
|
||||||
|
from trudex.infrastructure.database.dao.user_answer import UserAnswerDAO
|
||||||
|
from trudex.infrastructure.database.dto.test_attempt import TestAttemptDTO
|
||||||
|
from trudex.infrastructure.database.dto.user_answer import UserAnswerDTO
|
||||||
|
from trudex.infrastructure.database.models import (
|
||||||
|
TestAttempt as TestAttemptModel,
|
||||||
|
UserAnswer as UserAnswerModel,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class TestAttemptRepository:
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
self.session = session
|
||||||
|
self.attempt_dao = TestAttemptDAO(session)
|
||||||
|
self.answer_dao = UserAnswerDAO(session)
|
||||||
|
|
||||||
|
async def get_user_attempts(self, user_id: int) -> list[TestAttempt]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.order_by(TestAttemptModel.started_at.desc())
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [TestAttemptDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_test_attempts(self, test_id: int) -> list[TestAttempt]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.test_id == test_id)
|
||||||
|
.order_by(TestAttemptModel.started_at.desc())
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [TestAttemptDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_user_test_attempts(self, user_id: int, test_id: int) -> list[TestAttempt]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.where(TestAttemptModel.test_id == test_id)
|
||||||
|
.order_by(TestAttemptModel.started_at.desc())
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [TestAttemptDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_active_attempt(self, user_id: int, test_id: int) -> TestAttempt | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.where(TestAttemptModel.test_id == test_id)
|
||||||
|
.where(TestAttemptModel.finished_at == None)
|
||||||
|
.order_by(TestAttemptModel.started_at.desc())
|
||||||
|
)
|
||||||
|
model = result.scalar_one_or_none()
|
||||||
|
return TestAttemptDTO(model).to_domain() if model else None
|
||||||
|
|
||||||
|
async def get_attempt_with_answers(self, attempt_id: int) -> tuple[TestAttempt | None, list[UserAnswer]]:
|
||||||
|
attempt = await self.attempt_dao.get_by_id(attempt_id)
|
||||||
|
if not attempt:
|
||||||
|
return None, []
|
||||||
|
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.id == attempt_id)
|
||||||
|
.options(selectinload(TestAttemptModel.answers))
|
||||||
|
)
|
||||||
|
attempt_model = result.scalar_one_or_none()
|
||||||
|
if not attempt_model:
|
||||||
|
return attempt, []
|
||||||
|
|
||||||
|
answers = [UserAnswerDTO(answer).to_domain() for answer in attempt_model.answers]
|
||||||
|
return attempt, answers
|
||||||
|
|
||||||
|
async def get_answers_for_attempt(self, attempt_id: int) -> list[UserAnswer]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserAnswerModel)
|
||||||
|
.where(UserAnswerModel.attempt_id == attempt_id)
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserAnswerDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def count_user_attempts(self, user_id: int, test_id: int | None = None) -> int:
|
||||||
|
query = select(func.count(TestAttemptModel.id)).where(TestAttemptModel.user_id == user_id)
|
||||||
|
if test_id is not None:
|
||||||
|
query = query.where(TestAttemptModel.test_id == test_id)
|
||||||
|
|
||||||
|
result = await self.session.execute(query)
|
||||||
|
count = result.scalar_one()
|
||||||
|
return count
|
||||||
|
|
||||||
|
async def count_passed_attempts(self, user_id: int, test_id: int | None = None) -> int:
|
||||||
|
query = (
|
||||||
|
select(func.count(TestAttemptModel.id))
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.where(TestAttemptModel.is_passed == True)
|
||||||
|
)
|
||||||
|
if test_id is not None:
|
||||||
|
query = query.where(TestAttemptModel.test_id == test_id)
|
||||||
|
|
||||||
|
result = await self.session.execute(query)
|
||||||
|
count = result.scalar_one()
|
||||||
|
return count
|
||||||
|
|
||||||
|
async def get_best_attempt(self, user_id: int, test_id: int) -> TestAttempt | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.where(TestAttemptModel.test_id == test_id)
|
||||||
|
.where(TestAttemptModel.finished_at != None)
|
||||||
|
.order_by(TestAttemptModel.score.desc(), TestAttemptModel.started_at.asc())
|
||||||
|
)
|
||||||
|
model = result.scalar_one_or_none()
|
||||||
|
return TestAttemptDTO(model).to_domain() if model else None
|
||||||
|
|
||||||
|
async def get_latest_attempt(self, user_id: int, test_id: int) -> TestAttempt | None:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(TestAttemptModel)
|
||||||
|
.where(TestAttemptModel.user_id == user_id)
|
||||||
|
.where(TestAttemptModel.test_id == test_id)
|
||||||
|
.order_by(TestAttemptModel.started_at.desc())
|
||||||
|
)
|
||||||
|
model = result.scalar_one_or_none()
|
||||||
|
return TestAttemptDTO(model).to_domain() if model else None
|
||||||
|
|
||||||
|
async def finish_attempt(self, attempt_id: int, score: int, is_passed: bool) -> TestAttempt | None:
|
||||||
|
return await self.attempt_dao.update(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
finished_at=datetime.utcnow(),
|
||||||
|
score=score,
|
||||||
|
is_passed=is_passed
|
||||||
|
)
|
||||||
|
|
||||||
|
async def calculate_attempt_score(self, attempt_id: int) -> int:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(func.count(UserAnswerModel.id))
|
||||||
|
.where(UserAnswerModel.attempt_id == attempt_id)
|
||||||
|
.where(UserAnswerModel.is_correct == True)
|
||||||
|
)
|
||||||
|
count = result.scalar_one()
|
||||||
|
return count
|
||||||
|
|
||||||
|
async def get_incorrect_answers(self, attempt_id: int) -> list[UserAnswer]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserAnswerModel)
|
||||||
|
.where(UserAnswerModel.attempt_id == attempt_id)
|
||||||
|
.where(UserAnswerModel.is_correct == False)
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserAnswerDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_question_statistics(self, question_id: int) -> dict[str, int]:
|
||||||
|
total_result = await self.session.execute(
|
||||||
|
select(func.count(UserAnswerModel.id))
|
||||||
|
.where(UserAnswerModel.question_id == question_id)
|
||||||
|
)
|
||||||
|
total = total_result.scalar_one()
|
||||||
|
|
||||||
|
correct_result = await self.session.execute(
|
||||||
|
select(func.count(UserAnswerModel.id))
|
||||||
|
.where(UserAnswerModel.question_id == question_id)
|
||||||
|
.where(UserAnswerModel.is_correct == True)
|
||||||
|
)
|
||||||
|
correct = correct_result.scalar_one()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"total_answers": total,
|
||||||
|
"correct_answers": correct,
|
||||||
|
"incorrect_answers": total - correct,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def get_most_difficult_questions(self, test_id: int, limit: int = 10) -> list[tuple[int, float]]:
|
||||||
|
from trudex.infrastructure.database.models import Question as QuestionModel
|
||||||
|
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(
|
||||||
|
UserAnswerModel.question_id,
|
||||||
|
func.count(UserAnswerModel.id).label("total"),
|
||||||
|
func.sum(func.cast(UserAnswerModel.is_correct, func.Integer)).label("correct")
|
||||||
|
)
|
||||||
|
.join(QuestionModel, UserAnswerModel.question_id == QuestionModel.id)
|
||||||
|
.where(QuestionModel.test_id == test_id)
|
||||||
|
.group_by(UserAnswerModel.question_id)
|
||||||
|
.having(func.count(UserAnswerModel.id) > 0)
|
||||||
|
.order_by((func.sum(func.cast(UserAnswerModel.is_correct, func.Integer)) / func.count(UserAnswerModel.id)).asc())
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = result.all()
|
||||||
|
return [(row.question_id, row.correct / row.total if row.total > 0 else 0.0) for row in rows]
|
||||||
@@ -7,8 +7,11 @@ from trudex.infrastructure.database.config import new_session_maker
|
|||||||
from trudex.infrastructure.database.dao.option import OptionDAO
|
from trudex.infrastructure.database.dao.option import OptionDAO
|
||||||
from trudex.infrastructure.database.dao.question import QuestionDAO
|
from trudex.infrastructure.database.dao.question import QuestionDAO
|
||||||
from trudex.infrastructure.database.dao.test import TestDAO
|
from trudex.infrastructure.database.dao.test import TestDAO
|
||||||
|
from trudex.infrastructure.database.dao.test_attempt import TestAttemptDAO
|
||||||
from trudex.infrastructure.database.dao.user import UserDAO
|
from trudex.infrastructure.database.dao.user import UserDAO
|
||||||
|
from trudex.infrastructure.database.dao.user_answer import UserAnswerDAO
|
||||||
from trudex.infrastructure.database.repo.test import TestRepository
|
from trudex.infrastructure.database.repo.test import TestRepository
|
||||||
|
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
from trudex.infrastructure.database.repo.user import UserRepository
|
from trudex.infrastructure.database.repo.user import UserRepository
|
||||||
from trudex.infrastructure.utils.config import Config
|
from trudex.infrastructure.utils.config import Config
|
||||||
|
|
||||||
@@ -45,6 +48,14 @@ class DatabaseProvider(Provider):
|
|||||||
def get_option_dao(self, session: AsyncSession) -> OptionDAO:
|
def get_option_dao(self, session: AsyncSession) -> OptionDAO:
|
||||||
return OptionDAO(session)
|
return OptionDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_attempt_dao(self, session: AsyncSession) -> TestAttemptDAO:
|
||||||
|
return TestAttemptDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_user_answer_dao(self, session: AsyncSession) -> UserAnswerDAO:
|
||||||
|
return UserAnswerDAO(session)
|
||||||
|
|
||||||
@provide(scope=Scope.REQUEST)
|
@provide(scope=Scope.REQUEST)
|
||||||
def get_user_repository(self, session: AsyncSession) -> UserRepository:
|
def get_user_repository(self, session: AsyncSession) -> UserRepository:
|
||||||
return UserRepository(session)
|
return UserRepository(session)
|
||||||
@@ -52,3 +63,7 @@ class DatabaseProvider(Provider):
|
|||||||
@provide(scope=Scope.REQUEST)
|
@provide(scope=Scope.REQUEST)
|
||||||
def get_test_repository(self, session: AsyncSession) -> TestRepository:
|
def get_test_repository(self, session: AsyncSession) -> TestRepository:
|
||||||
return TestRepository(session)
|
return TestRepository(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_attempt_repository(self, session: AsyncSession) -> TestAttemptRepository:
|
||||||
|
return TestAttemptRepository(session)
|
||||||
|
|||||||
Reference in New Issue
Block a user