mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 18:35:28 +03:00
Initial commit
This commit is contained in:
@@ -1,8 +1,8 @@
|
|||||||
"""tests
|
"""tests
|
||||||
|
|
||||||
Revision ID: 780dec53b460
|
Revision ID: 59dd00dc1990
|
||||||
Revises: 409f04b7b544
|
Revises: 409f04b7b544
|
||||||
Create Date: 2025-12-31 01:09:25.135116
|
Create Date: 2026-01-01 03:02:33.134535
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
@@ -11,7 +11,7 @@ from alembic import op
|
|||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
revision: str = '780dec53b460'
|
revision: str = '59dd00dc1990'
|
||||||
down_revision: str | None = '409f04b7b544'
|
down_revision: str | None = '409f04b7b544'
|
||||||
branch_labels: str | Sequence[str] | None = None
|
branch_labels: str | Sequence[str] | None = None
|
||||||
depends_on: str | Sequence[str] | None = None
|
depends_on: str | Sequence[str] | None = None
|
||||||
@@ -25,6 +25,8 @@ def upgrade() -> None:
|
|||||||
sa.Column('description', sa.Text(), nullable=True),
|
sa.Column('description', sa.Text(), nullable=True),
|
||||||
sa.Column('for_group', sa.Integer(), nullable=True),
|
sa.Column('for_group', sa.Integer(), nullable=True),
|
||||||
sa.Column('is_active', sa.Boolean(), nullable=False),
|
sa.Column('is_active', sa.Boolean(), nullable=False),
|
||||||
|
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
|
||||||
|
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
|
||||||
sa.PrimaryKeyConstraint('id')
|
sa.PrimaryKeyConstraint('id')
|
||||||
)
|
)
|
||||||
op.create_table('questions',
|
op.create_table('questions',
|
||||||
@@ -25,6 +25,9 @@ dev = [
|
|||||||
"watchfiles>=1.1.1",
|
"watchfiles>=1.1.1",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[tool.pyright]
|
||||||
|
typeCheckingMode = "standard"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["hatchling"]
|
requires = ["hatchling"]
|
||||||
build-backend = "hatchling.build"
|
build-backend = "hatchling.build"
|
||||||
|
|||||||
@@ -0,0 +1,37 @@
|
|||||||
|
from typing import Any, Callable
|
||||||
|
from collections.abc import Awaitable
|
||||||
|
|
||||||
|
from aiogram import BaseMiddleware
|
||||||
|
from aiogram.types import Message, TelegramObject
|
||||||
|
from dishka import AsyncContainer
|
||||||
|
|
||||||
|
from trudex.infrastructure.database.repo import UserRepository
|
||||||
|
|
||||||
|
|
||||||
|
class RejectNotAdminMiddleware(BaseMiddleware):
|
||||||
|
async def __call__(
|
||||||
|
self,
|
||||||
|
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
|
||||||
|
event: TelegramObject,
|
||||||
|
data: dict[str, Any],
|
||||||
|
) -> Any:
|
||||||
|
if not isinstance(event, Message):
|
||||||
|
return await handler(event, data)
|
||||||
|
|
||||||
|
assert event.from_user is not None
|
||||||
|
|
||||||
|
container: AsyncContainer = data["dishka_container"]
|
||||||
|
user_id = event.from_user.id
|
||||||
|
admin_commands = ["/admin"]
|
||||||
|
if event.text:
|
||||||
|
if event.text.strip() in admin_commands:
|
||||||
|
users_dao: UserRepository = await container.get(UserRepository)
|
||||||
|
admins = await users_dao.get_admins()
|
||||||
|
if user_id in [admin.id for admin in admins]:
|
||||||
|
return await handler(event, data)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return await handler(event, data)
|
||||||
|
else:
|
||||||
|
return await handler(event, data)
|
||||||
@@ -1 +1,5 @@
|
|||||||
|
|
||||||
|
from .test import TestDAO as TestDAO
|
||||||
|
from .user import UserDAO as UserDAO
|
||||||
|
from .question import QuestionDAO as QuestionDAO
|
||||||
|
from .option import OptionDAO as OptionDAO
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
from trudex.infrastructure.database.repo.test import TestRepository
|
||||||
|
from trudex.infrastructure.database.repo.user import UserRepository
|
||||||
|
|
||||||
|
__all__ = ["TestRepository", "UserRepository"]
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from typing import final
|
from typing import final
|
||||||
|
|
||||||
from sqlalchemy import func, select
|
from sqlalchemy import func, select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from sqlalchemy.orm import selectinload
|
from sqlalchemy.orm import selectinload
|
||||||
@@ -145,11 +146,11 @@ class TestRepository:
|
|||||||
)
|
)
|
||||||
|
|
||||||
for option in options:
|
for option in options:
|
||||||
_ = await self.option_dao.create(
|
await self.option_dao.create(
|
||||||
question_id=new_question.id,
|
question_id=new_question.id,
|
||||||
text=option.text,
|
text=option.text,
|
||||||
is_correct=option.is_correct,
|
is_correct=option.is_correct,
|
||||||
explanation=option.explanation,
|
explanation=option.explanation,
|
||||||
)
|
)
|
||||||
|
|
||||||
return new_test
|
return new_test
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
from typing import final
|
||||||
|
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from trudex.domain.schemas import User
|
||||||
|
from trudex.infrastructure.database.dao.user import UserDAO
|
||||||
|
from trudex.infrastructure.database.dto.user import UserDTO
|
||||||
|
from trudex.infrastructure.database.models import User as UserModel
|
||||||
|
|
||||||
|
|
||||||
|
@final
|
||||||
|
class UserRepository:
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
self.session = session
|
||||||
|
self.user_dao = UserDAO(session)
|
||||||
|
|
||||||
|
async def get_admins(self) -> list[User]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserModel).where(UserModel.is_admin == True)
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_users_by_group(self, group: int) -> list[User]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserModel).where(UserModel.group == group)
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def get_users_without_group(self) -> list[User]:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(UserModel).where(UserModel.group == None)
|
||||||
|
)
|
||||||
|
models = list(result.scalars().all())
|
||||||
|
return [UserDTO(model).to_domain() for model in models]
|
||||||
|
|
||||||
|
async def is_admin(self, user_id: int) -> bool:
|
||||||
|
user = await self.user_dao.get_by_id(user_id)
|
||||||
|
return user.is_admin if user else False
|
||||||
|
|
||||||
|
async def has_group(self, user_id: int) -> bool:
|
||||||
|
user = await self.user_dao.get_by_id(user_id)
|
||||||
|
return user.group is not None if user else False
|
||||||
|
|
||||||
|
async def count_users_by_group(self, group: int) -> int:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(func.count(UserModel.id))
|
||||||
|
.where(UserModel.group == group)
|
||||||
|
)
|
||||||
|
count = result.scalar_one()
|
||||||
|
return count
|
||||||
|
|
||||||
|
async def count_admins(self) -> int:
|
||||||
|
result = await self.session.execute(
|
||||||
|
select(func.count(UserModel.id))
|
||||||
|
.where(UserModel.is_admin == True)
|
||||||
|
)
|
||||||
|
count = result.scalar_one()
|
||||||
|
return count
|
||||||
@@ -4,6 +4,12 @@ from dishka import Provider, Scope, provide
|
|||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||||
|
|
||||||
from trudex.infrastructure.database.config import new_session_maker
|
from trudex.infrastructure.database.config import new_session_maker
|
||||||
|
from trudex.infrastructure.database.dao.option import OptionDAO
|
||||||
|
from trudex.infrastructure.database.dao.question import QuestionDAO
|
||||||
|
from trudex.infrastructure.database.dao.test import TestDAO
|
||||||
|
from trudex.infrastructure.database.dao.user import UserDAO
|
||||||
|
from trudex.infrastructure.database.repo.test import TestRepository
|
||||||
|
from trudex.infrastructure.database.repo.user import UserRepository
|
||||||
from trudex.infrastructure.utils.config import Config
|
from trudex.infrastructure.utils.config import Config
|
||||||
|
|
||||||
|
|
||||||
@@ -20,5 +26,25 @@ class DatabaseProvider(Provider):
|
|||||||
yield session
|
yield session
|
||||||
|
|
||||||
@provide(scope=Scope.REQUEST)
|
@provide(scope=Scope.REQUEST)
|
||||||
async def get_users_dao(self, session: AsyncSession) -> UsersDAO:
|
def get_user_dao(self, session: AsyncSession) -> UserDAO:
|
||||||
return UsersDAO(session)
|
return UserDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_dao(self, session: AsyncSession) -> TestDAO:
|
||||||
|
return TestDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_question_dao(self, session: AsyncSession) -> QuestionDAO:
|
||||||
|
return QuestionDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_option_dao(self, session: AsyncSession) -> OptionDAO:
|
||||||
|
return OptionDAO(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_user_repository(self, session: AsyncSession) -> UserRepository:
|
||||||
|
return UserRepository(session)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_repository(self, session: AsyncSession) -> TestRepository:
|
||||||
|
return TestRepository(session)
|
||||||
|
|||||||
Reference in New Issue
Block a user