Initial commit

This commit is contained in:
2026-01-01 02:56:55 +03:00
parent f84efea30f
commit 59a4baabd4
11 changed files with 484 additions and 16 deletions
@@ -0,0 +1,78 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from trudex.domain.schemas import Option as DomainOption
from trudex.infrastructure.database.dto.option import OptionDTO
from trudex.infrastructure.database.models import Option
class OptionDAO:
def __init__(self, session: AsyncSession) -> None:
self.session: AsyncSession = session
async def get_by_id(self, option_id: int) -> DomainOption | None:
result = await self.session.execute(
select(Option).where(Option.id == option_id)
)
model = result.scalar_one_or_none()
return OptionDTO(model).to_domain() if model else None
async def get_all(self) -> list[DomainOption]:
result = await self.session.execute(select(Option))
models = list(result.scalars().all())
return [OptionDTO(model).to_domain() for model in models]
async def create(
self,
question_id: int,
text: str,
is_correct: bool = False,
explanation: str | None = None,
) -> DomainOption:
option = Option(
question_id=question_id,
text=text,
is_correct=is_correct,
explanation=explanation,
)
self.session.add(option)
await self.session.flush()
await self.session.refresh(option)
return OptionDTO(option).to_domain()
async def update(
self,
option_id: int,
text: str | None = None,
is_correct: bool | None = None,
explanation: str | None = None,
) -> DomainOption | None:
result = await self.session.execute(
select(Option).where(Option.id == option_id)
)
option = result.scalar_one_or_none()
if not option:
return None
if text is not None:
option.text = text
if is_correct is not None:
option.is_correct = is_correct
if explanation is not None:
option.explanation = explanation
await self.session.flush()
await self.session.refresh(option)
return OptionDTO(option).to_domain()
async def delete(self, option_id: int) -> bool:
result = await self.session.execute(
select(Option).where(Option.id == option_id)
)
option = result.scalar_one_or_none()
if not option:
return False
await self.session.delete(option)
await self.session.flush()
return True
@@ -0,0 +1,83 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from trudex.domain.schemas import Question as DomainQuestion
from trudex.infrastructure.database.dto.question import QuestionDTO
from trudex.infrastructure.database.models import Question, QuestionType
class QuestionDAO:
def __init__(self, session: AsyncSession) -> None:
self.session: AsyncSession = session
async def get_by_id(self, question_id: int) -> DomainQuestion | None:
result = await self.session.execute(
select(Question).where(Question.id == question_id)
)
model = result.scalar_one_or_none()
return QuestionDTO(model).to_domain() if model else None
async def get_all(self) -> list[DomainQuestion]:
result = await self.session.execute(select(Question))
models = list(result.scalars().all())
return [QuestionDTO(model).to_domain() for model in models]
async def create(
self,
test_id: int,
text: str,
position: int = 0,
question_type: str = "single",
tg_file_id: str | None = None,
) -> DomainQuestion:
question = Question(
test_id=test_id,
text=text,
position=position,
question_type=QuestionType(question_type),
tg_file_id=tg_file_id,
)
self.session.add(question)
await self.session.flush()
await self.session.refresh(question)
return QuestionDTO(question).to_domain()
async def update(
self,
question_id: int,
text: str | None = None,
position: int | None = None,
question_type: str | None = None,
tg_file_id: str | None = None,
) -> DomainQuestion | None:
result = await self.session.execute(
select(Question).where(Question.id == question_id)
)
question = result.scalar_one_or_none()
if not question:
return None
if text is not None:
question.text = text
if position is not None:
question.position = position
if question_type is not None:
question.question_type = QuestionType(question_type)
if tg_file_id is not None:
question.tg_file_id = tg_file_id
await self.session.flush()
await self.session.refresh(question)
return QuestionDTO(question).to_domain()
async def delete(self, question_id: int) -> bool:
result = await self.session.execute(
select(Question).where(Question.id == question_id)
)
question = result.scalar_one_or_none()
if not question:
return False
await self.session.delete(question)
await self.session.flush()
return True
@@ -0,0 +1,81 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from trudex.domain.schemas import Test as DomainTest
from trudex.infrastructure.database.dto.test import TestDTO
from trudex.infrastructure.database.models import Test
class TestDAO:
def __init__(self, session: AsyncSession) -> None:
self.session: AsyncSession = session
async def get_by_id(self, test_id: int) -> DomainTest | None:
result = await self.session.execute(
select(Test).where(Test.id == test_id)
)
model = result.scalar_one_or_none()
return TestDTO(model).to_domain() if model else None
async def get_all(self) -> list[DomainTest]:
result = await self.session.execute(select(Test))
models = list(result.scalars().all())
return [TestDTO(model).to_domain() for model in models]
async def create(
self,
title: str,
description: str | None = None,
for_group: int | None = None,
is_active: bool = True,
) -> DomainTest:
test = Test(
title=title,
description=description,
for_group=for_group,
is_active=is_active,
)
self.session.add(test)
await self.session.flush()
await self.session.refresh(test)
return TestDTO(test).to_domain()
async def update(
self,
test_id: int,
title: str | None = None,
description: str | None = None,
for_group: int | None = None,
is_active: bool | None = None,
) -> DomainTest | None:
result = await self.session.execute(
select(Test).where(Test.id == test_id)
)
test = result.scalar_one_or_none()
if not test:
return None
if title is not None:
test.title = title
if description is not None:
test.description = description
if for_group is not None:
test.for_group = for_group
if is_active is not None:
test.is_active = is_active
await self.session.flush()
await self.session.refresh(test)
return TestDTO(test).to_domain()
async def delete(self, test_id: int) -> bool:
result = await self.session.execute(
select(Test).where(Test.id == test_id)
)
test = result.scalar_one_or_none()
if not test:
return False
await self.session.delete(test)
await self.session.flush()
return True
@@ -22,20 +22,6 @@ class UserDAO:
models = list(result.scalars().all())
return [UserDTO(model).to_domain() for model in models]
async def get_by_group(self, group: int) -> list[DomainUser]:
result = await self.session.execute(
select(User).where(User.group == group)
)
models = list(result.scalars().all())
return [UserDTO(model).to_domain() for model in models]
async def get_admins(self) -> list[DomainUser]:
result = await self.session.execute(
select(User).where(User.is_admin == True)
)
models = list(result.scalars().all())
return [UserDTO(model).to_domain() for model in models]
async def create(
self,
user_id: int,
@@ -0,0 +1,16 @@
from trudex.domain.schemas import Option as DomainOption
from trudex.infrastructure.database.models import Option as OptionModel
class OptionDTO:
def __init__(self, model: OptionModel) -> None:
self.model: OptionModel = model
def to_domain(self) -> DomainOption:
return DomainOption(
id=self.model.id,
question_id=self.model.question_id,
text=self.model.text,
is_correct=self.model.is_correct,
explanation=self.model.explanation,
)
@@ -0,0 +1,17 @@
from trudex.domain.schemas import Question as DomainQuestion
from trudex.infrastructure.database.models import Question as QuestionModel
class QuestionDTO:
def __init__(self, model: QuestionModel) -> None:
self.model: QuestionModel = model
def to_domain(self) -> DomainQuestion:
return DomainQuestion(
id=self.model.id,
test_id=self.model.test_id,
text=self.model.text,
position=self.model.position,
question_type=self.model.question_type.value,
tg_file_id=self.model.tg_file_id,
)
@@ -0,0 +1,18 @@
from trudex.domain.schemas import Test as DomainTest
from trudex.infrastructure.database.models import Test as TestModel
class TestDTO:
def __init__(self, model: TestModel) -> None:
self.model: TestModel = model
def to_domain(self) -> DomainTest:
return DomainTest(
id=self.model.id,
title=self.model.title,
description=self.model.description,
for_group=self.model.for_group,
is_active=self.model.is_active,
created_at=self.model.created_at,
updated_at=self.model.updated_at,
)
+6 -2
View File
@@ -1,6 +1,5 @@
from datetime import datetime
from enum import Enum
from tokenize import group
from typing import final
from sqlalchemy import BigInteger, CheckConstraint, ForeignKey, Integer, String, Text, func
@@ -30,6 +29,7 @@ class QuestionType(str, Enum):
MULTIPLE = "multiple"
INPUT = "input"
@final
class Test(Base):
__tablename__ = "tests"
@@ -37,8 +37,10 @@ class Test(Base):
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(255))
description: Mapped[str | None] = mapped_column(Text)
for_group: Mapped[int | None] = mapped_column(default=None)
for_group: Mapped[int | None] = mapped_column(default=None)
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(server_default=func.now(), onupdate=func.now())
questions: Mapped[list["Question"]] = relationship(
back_populates="test",
@@ -46,6 +48,7 @@ class Test(Base):
order_by="Question.position"
)
@final
class Question(Base):
__tablename__ = "questions"
@@ -63,6 +66,7 @@ class Question(Base):
cascade="all, delete-orphan"
)
@final
class Option(Base):
__tablename__ = "options"
@@ -0,0 +1,155 @@
from typing import final
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from trudex.domain.schemas import Option, Question, Test
from trudex.infrastructure.database.dao.option import OptionDAO
from trudex.infrastructure.database.dao.question import QuestionDAO
from trudex.infrastructure.database.dao.test import TestDAO
from trudex.infrastructure.database.dto.option import OptionDTO
from trudex.infrastructure.database.dto.question import QuestionDTO
from trudex.infrastructure.database.dto.test import TestDTO
from trudex.infrastructure.database.models import (
Option as OptionModel,
Question as QuestionModel,
Test as TestModel,
)
@final
class TestRepository:
def __init__(self, session: AsyncSession) -> None:
self.session = session
self.test_dao = TestDAO(session)
self.question_dao = QuestionDAO(session)
self.option_dao = OptionDAO(session)
async def get_active_tests(self) -> list[Test]:
result = await self.session.execute(
select(TestModel).where(TestModel.is_active == True)
)
models = list(result.scalars().all())
return [TestDTO(model).to_domain() for model in models]
async def get_tests_by_group(self, group: int) -> list[Test]:
result = await self.session.execute(
select(TestModel).where(TestModel.for_group == group)
)
models = list(result.scalars().all())
return [TestDTO(model).to_domain() for model in models]
async def get_active_tests_by_group(self, group: int) -> list[Test]:
result = await self.session.execute(
select(TestModel)
.where(TestModel.for_group == group)
.where(TestModel.is_active == True)
)
models = list(result.scalars().all())
return [TestDTO(model).to_domain() for model in models]
async def get_test_with_questions(self, test_id: int) -> tuple[Test | None, list[Question]]:
test = await self.test_dao.get_by_id(test_id)
if not test:
return None, []
result = await self.session.execute(
select(TestModel)
.where(TestModel.id == test_id)
.options(selectinload(TestModel.questions))
)
test_model = result.scalar_one_or_none()
if not test_model:
return test, []
questions = [QuestionDTO(q).to_domain() for q in sorted(test_model.questions, key=lambda x: x.position)]
return test, questions
async def get_question_with_options(self, question_id: int) -> tuple[Question | None, list[Option]]:
question = await self.question_dao.get_by_id(question_id)
if not question:
return None, []
result = await self.session.execute(
select(QuestionModel)
.where(QuestionModel.id == question_id)
.options(selectinload(QuestionModel.options))
)
question_model = result.scalar_one_or_none()
if not question_model:
return question, []
options = [OptionDTO(o).to_domain() for o in question_model.options]
return question, options
async def get_correct_options_for_question(self, question_id: int) -> list[Option]:
result = await self.session.execute(
select(OptionModel)
.where(OptionModel.question_id == question_id)
.where(OptionModel.is_correct == True)
)
models = list(result.scalars().all())
return [OptionDTO(model).to_domain() for model in models]
async def get_full_test(self, test_id: int) -> tuple[Test | None, list[tuple[Question, list[Option]]]]:
test = await self.test_dao.get_by_id(test_id)
if not test:
return None, []
result = await self.session.execute(
select(TestModel)
.where(TestModel.id == test_id)
.options(
selectinload(TestModel.questions).selectinload(QuestionModel.options)
)
)
test_model = result.scalar_one_or_none()
if not test_model:
return test, []
questions_with_options: list[tuple[Question, list[Option]]] = []
for question_model in sorted(test_model.questions, key=lambda x: x.position):
question = QuestionDTO(question_model).to_domain()
options = [OptionDTO(o).to_domain() for o in question_model.options]
questions_with_options.append((question, options))
return test, questions_with_options
async def count_questions_in_test(self, test_id: int) -> int:
result = await self.session.execute(
select(func.count(QuestionModel.id))
.where(QuestionModel.test_id == test_id)
)
count = result.scalar_one()
return count
async def duplicate_test(self, test_id: int, new_title: str) -> Test | None:
test, questions_with_options = await self.get_full_test(test_id)
if not test:
return None
new_test = await self.test_dao.create(
title=new_title,
description=test.description,
for_group=test.for_group,
is_active=False,
)
for question, options in questions_with_options:
new_question = await self.question_dao.create(
test_id=new_test.id,
text=question.text,
position=question.position,
question_type=question.question_type,
tg_file_id=question.tg_file_id,
)
for option in options:
_ = await self.option_dao.create(
question_id=new_question.id,
text=option.text,
is_correct=option.is_correct,
explanation=option.explanation,
)
return new_test