mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 18:35:28 +03:00
commit
This commit is contained in:
@@ -1,26 +1,190 @@
|
||||
from aiogram.types import CallbackQuery
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, Window
|
||||
from aiogram_dialog.widgets.kbd import Button, Column
|
||||
from aiogram_dialog.widgets.text import Const
|
||||
from aiogram_dialog.widgets.input import MessageInput
|
||||
from aiogram_dialog.widgets.kbd import Button, Column, Row, ScrollingGroup, Select
|
||||
from aiogram_dialog.widgets.text import Const, Format
|
||||
from dishka import FromDishka
|
||||
from dishka.integrations.aiogram_dialog import inject
|
||||
|
||||
from trudex.application.bot.user_dialogs.states import UserMenuSG
|
||||
from trudex.infrastructure.database.dao.group import GroupDAO
|
||||
from trudex.infrastructure.database.dao.user import UserDAO
|
||||
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||
|
||||
|
||||
async def on_tests_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager) -> None:
|
||||
await _callback.answer("Доступные тесты")
|
||||
@inject
|
||||
async def get_user_data(
|
||||
dialog_manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
**_kwargs
|
||||
):
|
||||
user_id = dialog_manager.event.from_user.id
|
||||
user = await user_dao.get_by_id(user_id)
|
||||
stats = await attempt_repo.get_user_stats(user_id)
|
||||
|
||||
if not user:
|
||||
return {"user_info": "❌ Пользователь не найден"}
|
||||
|
||||
name = user.name or user.first_name
|
||||
group_str = f"🎓 Группа {user.group}" if user.group else "👤 Группа не указана"
|
||||
|
||||
if stats["total_attempts"] > 0:
|
||||
accuracy_str = f"📊 Средняя точность: <b>{stats['avg_score']}%</b>"
|
||||
tests_str = f"📝 Пройдено тестов: <b>{stats['total_attempts']}</b>"
|
||||
else:
|
||||
accuracy_str = "📊 Средняя точность: <b>—</b>"
|
||||
tests_str = "📝 Пройдено тестов: <b>0</b>"
|
||||
|
||||
user_info = (
|
||||
f"<b>👋 Привет, {name}!</b>\n\n"
|
||||
f"<blockquote>{group_str}</blockquote>\n\n"
|
||||
f"{tests_str}\n"
|
||||
f"{accuracy_str}"
|
||||
)
|
||||
|
||||
return {"user_info": user_info}
|
||||
|
||||
|
||||
async def on_results_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager) -> None:
|
||||
await _callback.answer("Мои результаты")
|
||||
def can_edit_field(updated_at: datetime | None) -> bool:
|
||||
if updated_at is None:
|
||||
return True
|
||||
return datetime.utcnow() - updated_at >= timedelta(hours=24)
|
||||
|
||||
|
||||
def get_remaining_time(updated_at: datetime) -> str:
|
||||
remaining = timedelta(hours=24) - (datetime.utcnow() - updated_at)
|
||||
hours = int(remaining.total_seconds() // 3600)
|
||||
minutes = int((remaining.total_seconds() % 3600) // 60)
|
||||
return f"{hours}ч {minutes}м"
|
||||
|
||||
|
||||
@inject
|
||||
async def on_edit_name_clicked(
|
||||
_callback: CallbackQuery,
|
||||
_button: Button,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
):
|
||||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||||
if not user:
|
||||
await _callback.answer("❌ Пользователь не найден")
|
||||
return
|
||||
|
||||
if not can_edit_field(user.name_updated_at):
|
||||
remaining = get_remaining_time(user.name_updated_at)
|
||||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||||
return
|
||||
|
||||
await manager.switch_to(UserMenuSG.edit_name)
|
||||
|
||||
|
||||
@inject
|
||||
async def on_edit_group_clicked(
|
||||
_callback: CallbackQuery,
|
||||
_button: Button,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
):
|
||||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||||
if not user:
|
||||
await _callback.answer("❌ Пользователь не найден")
|
||||
return
|
||||
|
||||
if not can_edit_field(user.group_updated_at):
|
||||
remaining = get_remaining_time(user.group_updated_at)
|
||||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||||
return
|
||||
|
||||
await manager.switch_to(UserMenuSG.edit_group)
|
||||
|
||||
|
||||
async def on_tests_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager):
|
||||
await _callback.answer("🚧 В разработке")
|
||||
|
||||
|
||||
async def on_results_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager):
|
||||
await _callback.answer("🚧 В разработке")
|
||||
|
||||
|
||||
async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
await manager.switch_to(UserMenuSG.main)
|
||||
|
||||
|
||||
@inject
|
||||
async def on_name_input(
|
||||
message: Message,
|
||||
_widget: MessageInput,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
):
|
||||
if not message.text or len(message.text.strip()) < 2:
|
||||
await message.answer("❌ Имя должно содержать минимум 2 символа")
|
||||
return
|
||||
|
||||
name = message.text.strip()[:128]
|
||||
await user_dao.update(message.from_user.id, name=name, name_updated_at=datetime.utcnow())
|
||||
await message.answer("✅ Имя обновлено")
|
||||
await manager.switch_to(UserMenuSG.main)
|
||||
|
||||
|
||||
@inject
|
||||
async def get_groups_data(group_dao: FromDishka[GroupDAO], **_kwargs):
|
||||
groups = await group_dao.get_all()
|
||||
return {"groups": [(str(g.number), str(g.number)) for g in groups]}
|
||||
|
||||
|
||||
@inject
|
||||
async def on_group_selected(
|
||||
_callback: CallbackQuery,
|
||||
_widget,
|
||||
manager: DialogManager,
|
||||
item_id: str,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
):
|
||||
await user_dao.update(_callback.from_user.id, group=int(item_id), group_updated_at=datetime.utcnow())
|
||||
await _callback.answer("✅ Группа обновлена")
|
||||
await manager.switch_to(UserMenuSG.main)
|
||||
|
||||
|
||||
user_menu_dialog = Dialog(
|
||||
Window(
|
||||
Const("📚 <b>Главное меню</b>\n\nВыберите раздел:"),
|
||||
Format("{user_info}"),
|
||||
Column(
|
||||
Button(Const("📝 Доступные тесты"), id="tests", on_click=on_tests_clicked),
|
||||
Button(Const("📊 Мои результаты"), id="results", on_click=on_results_clicked),
|
||||
),
|
||||
Row(
|
||||
Button(Const("✏️ Имя"), id="edit_name", on_click=on_edit_name_clicked),
|
||||
Button(Const("🎓 Группа"), id="edit_group", on_click=on_edit_group_clicked),
|
||||
),
|
||||
state=UserMenuSG.main,
|
||||
getter=get_user_data,
|
||||
),
|
||||
Window(
|
||||
Const("<b>✏️ Изменение имени</b>\n\nВведите новое имя:"),
|
||||
MessageInput(on_name_input),
|
||||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_main),
|
||||
state=UserMenuSG.edit_name,
|
||||
),
|
||||
Window(
|
||||
Const("<b>🎓 Изменение группы</b>\n\nВыберите группу:"),
|
||||
ScrollingGroup(
|
||||
Select(
|
||||
Format("{item[1]}"),
|
||||
id="groups",
|
||||
item_id_getter=lambda x: x[0],
|
||||
items="groups",
|
||||
on_click=on_group_selected,
|
||||
),
|
||||
id="groups_scroll",
|
||||
width=2,
|
||||
height=7,
|
||||
),
|
||||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_main),
|
||||
state=UserMenuSG.edit_group,
|
||||
getter=get_groups_data,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -3,6 +3,8 @@ from aiogram.fsm.state import State, StatesGroup
|
||||
|
||||
class UserMenuSG(StatesGroup):
|
||||
main = State()
|
||||
edit_name = State()
|
||||
edit_group = State()
|
||||
|
||||
|
||||
class UserRegistrationSG(StatesGroup):
|
||||
|
||||
@@ -11,6 +11,8 @@ class User:
|
||||
name: str | None = None
|
||||
group: int | None = None
|
||||
is_admin: bool = False
|
||||
name_updated_at: datetime | None = None
|
||||
group_updated_at: datetime | None = None
|
||||
created_at: datetime | None = None
|
||||
updated_at: datetime | None = None
|
||||
|
||||
|
||||
@@ -78,3 +78,10 @@ class TestAttemptDAO:
|
||||
await self.session.delete(attempt)
|
||||
await self.session.flush()
|
||||
return True
|
||||
|
||||
async def get_by_user_id(self, user_id: int) -> list[DomainTestAttempt]:
|
||||
result = await self.session.execute(
|
||||
select(TestAttempt).where(TestAttempt.user_id == user_id)
|
||||
)
|
||||
models = list(result.scalars().all())
|
||||
return [TestAttemptDTO(model).to_domain() for model in models]
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -55,6 +57,8 @@ class UserDAO:
|
||||
name: str | None = None,
|
||||
group: int | None = None,
|
||||
is_admin: bool | None = None,
|
||||
name_updated_at: datetime | None = None,
|
||||
group_updated_at: datetime | None = None,
|
||||
) -> DomainUser | None:
|
||||
result = await self.session.execute(
|
||||
select(User).where(User.id == user_id)
|
||||
@@ -75,6 +79,10 @@ class UserDAO:
|
||||
user.group = group
|
||||
if is_admin is not None:
|
||||
user.is_admin = is_admin
|
||||
if name_updated_at is not None:
|
||||
user.name_updated_at = name_updated_at
|
||||
if group_updated_at is not None:
|
||||
user.group_updated_at = group_updated_at
|
||||
|
||||
await self.session.flush()
|
||||
await self.session.refresh(user)
|
||||
|
||||
@@ -15,6 +15,8 @@ class UserDTO:
|
||||
name=self.model.name,
|
||||
group=self.model.group,
|
||||
is_admin=self.model.is_admin,
|
||||
name_updated_at=self.model.name_updated_at,
|
||||
group_updated_at=self.model.group_updated_at,
|
||||
created_at=self.model.created_at,
|
||||
updated_at=self.model.updated_at,
|
||||
)
|
||||
|
||||
@@ -22,6 +22,8 @@ class User(Base):
|
||||
name: Mapped[str | None] = mapped_column(String(128))
|
||||
group: Mapped[int | None] = mapped_column(CheckConstraint("group >= 1000 AND group <= 9999"))
|
||||
is_admin: Mapped[bool] = mapped_column(default=False)
|
||||
name_updated_at: Mapped[datetime | None] = mapped_column(default=None)
|
||||
group_updated_at: Mapped[datetime | None] = mapped_column(default=None)
|
||||
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
|
||||
updated_at: Mapped[datetime] = mapped_column(server_default=func.now(), onupdate=func.now())
|
||||
|
||||
|
||||
@@ -195,3 +195,19 @@ class TestAttemptRepository:
|
||||
|
||||
rows = result.all()
|
||||
return [(row.question_id, row.correct / row.total if row.total > 0 else 0.0) for row in rows]
|
||||
|
||||
async def get_user_stats(self, user_id: int) -> dict:
|
||||
result = await self.session.execute(
|
||||
select(
|
||||
func.count(TestAttemptModel.id).label("total_attempts"),
|
||||
func.avg(TestAttemptModel.score).label("avg_score"),
|
||||
).where(
|
||||
TestAttemptModel.user_id == user_id,
|
||||
TestAttemptModel.finished_at.isnot(None)
|
||||
)
|
||||
)
|
||||
row = result.one()
|
||||
return {
|
||||
"total_attempts": row.total_attempts or 0,
|
||||
"avg_score": round(row.avg_score, 1) if row.avg_score else 0,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user