mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 10:25:28 +03:00
291 lines
9.9 KiB
Python
291 lines
9.9 KiB
Python
from datetime import datetime, timedelta
|
||
|
||
from aiogram.types import CallbackQuery, Message
|
||
from aiogram_dialog import Dialog, DialogManager, Window
|
||
from aiogram_dialog.widgets.input import MessageInput
|
||
from aiogram_dialog.widgets.kbd import Button, Column, Row, ScrollingGroup, Select
|
||
from aiogram_dialog.widgets.text import Const, Format
|
||
from dishka import FromDishka
|
||
from dishka.integrations.aiogram_dialog import inject
|
||
|
||
from trudex.application.bot.user_dialogs.states import UserMenuSG
|
||
from trudex.application.bot.user_dialogs.take_test import on_start_test
|
||
from trudex.infrastructure.database.dao.group import GroupDAO
|
||
from trudex.infrastructure.database.dao.user import UserDAO
|
||
from trudex.infrastructure.database.repo.test import TestRepository
|
||
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||
|
||
|
||
@inject
|
||
async def get_user_data(
|
||
dialog_manager: DialogManager,
|
||
user_dao: FromDishka[UserDAO],
|
||
attempt_repo: FromDishka[TestAttemptRepository],
|
||
**_kwargs
|
||
):
|
||
user_id = dialog_manager.event.from_user.id
|
||
user = await user_dao.get_by_id(user_id)
|
||
stats = await attempt_repo.get_user_stats(user_id)
|
||
|
||
if not user:
|
||
return {"user_info": "❌ Пользователь не найден"}
|
||
|
||
name = user.name or user.first_name
|
||
group_str = f"🎓 Группа {user.group}" if user.group else "👤 Группа не указана"
|
||
|
||
if stats["total_attempts"] > 0:
|
||
accuracy_str = f"📊 Средняя точность: <b>{stats['avg_score']}%</b>"
|
||
tests_str = f"📝 Пройдено тестов: <b>{stats['total_attempts']}</b>"
|
||
else:
|
||
accuracy_str = "📊 Средняя точность: <b>—</b>"
|
||
tests_str = "📝 Пройдено тестов: <b>0</b>"
|
||
|
||
user_info = (
|
||
f"<b>👋 Привет, {name}!</b>\n\n"
|
||
f"<blockquote>{group_str}</blockquote>\n\n"
|
||
f"{tests_str}\n"
|
||
f"{accuracy_str}"
|
||
)
|
||
|
||
return {"user_info": user_info}
|
||
|
||
|
||
def can_edit_field(updated_at: datetime | None) -> bool:
|
||
if updated_at is None:
|
||
return True
|
||
return datetime.utcnow() - updated_at >= timedelta(hours=24)
|
||
|
||
|
||
def get_remaining_time(updated_at: datetime) -> str:
|
||
remaining = timedelta(hours=24) - (datetime.utcnow() - updated_at)
|
||
hours = int(remaining.total_seconds() // 3600)
|
||
minutes = int((remaining.total_seconds() % 3600) // 60)
|
||
return f"{hours}ч {minutes}м"
|
||
|
||
|
||
@inject
|
||
async def on_edit_name_clicked(
|
||
_callback: CallbackQuery,
|
||
_button: Button,
|
||
manager: DialogManager,
|
||
user_dao: FromDishka[UserDAO]
|
||
):
|
||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||
if not user:
|
||
await _callback.answer("❌ Пользователь не найден")
|
||
return
|
||
|
||
if not can_edit_field(user.name_updated_at):
|
||
remaining = get_remaining_time(user.name_updated_at)
|
||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||
return
|
||
|
||
await manager.switch_to(UserMenuSG.edit_name)
|
||
|
||
|
||
@inject
|
||
async def on_edit_group_clicked(
|
||
_callback: CallbackQuery,
|
||
_button: Button,
|
||
manager: DialogManager,
|
||
user_dao: FromDishka[UserDAO]
|
||
):
|
||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||
if not user:
|
||
await _callback.answer("❌ Пользователь не найден")
|
||
return
|
||
|
||
if not can_edit_field(user.group_updated_at):
|
||
remaining = get_remaining_time(user.group_updated_at)
|
||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||
return
|
||
|
||
await manager.switch_to(UserMenuSG.edit_group)
|
||
|
||
|
||
async def on_tests_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||
await manager.switch_to(UserMenuSG.available_tests)
|
||
|
||
|
||
async def on_results_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager):
|
||
await _callback.answer("🚧 В разработке")
|
||
|
||
|
||
async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||
await manager.switch_to(UserMenuSG.main)
|
||
|
||
|
||
@inject
|
||
async def on_name_input(
|
||
message: Message,
|
||
_widget: MessageInput,
|
||
manager: DialogManager,
|
||
user_dao: FromDishka[UserDAO]
|
||
):
|
||
if not message.text or len(message.text.strip()) < 2:
|
||
await message.answer("❌ Имя должно содержать минимум 2 символа")
|
||
return
|
||
|
||
name = message.text.strip()[:128]
|
||
await user_dao.update(message.from_user.id, name=name, name_updated_at=datetime.utcnow())
|
||
await message.answer("✅ Имя обновлено")
|
||
await manager.switch_to(UserMenuSG.main)
|
||
|
||
|
||
@inject
|
||
async def get_groups_data(group_dao: FromDishka[GroupDAO], **_kwargs):
|
||
groups = await group_dao.get_all()
|
||
return {"groups": [(str(g.number), str(g.number)) for g in groups]}
|
||
|
||
|
||
@inject
|
||
async def on_group_selected(
|
||
_callback: CallbackQuery,
|
||
_widget,
|
||
manager: DialogManager,
|
||
item_id: str,
|
||
user_dao: FromDishka[UserDAO]
|
||
):
|
||
await user_dao.update(_callback.from_user.id, group=int(item_id), group_updated_at=datetime.utcnow())
|
||
await _callback.answer("✅ Группа обновлена")
|
||
await manager.switch_to(UserMenuSG.main)
|
||
|
||
|
||
@inject
|
||
async def get_available_tests(
|
||
dialog_manager: DialogManager,
|
||
user_dao: FromDishka[UserDAO],
|
||
test_repo: FromDishka[TestRepository],
|
||
**_kwargs
|
||
):
|
||
user_id = dialog_manager.event.from_user.id
|
||
user = await user_dao.get_by_id(user_id)
|
||
|
||
if not user:
|
||
return {"tests": [], "count": 0}
|
||
|
||
tests = await test_repo.get_available_tests_for_user(user_id, user.group)
|
||
|
||
return {
|
||
"tests": [(f"📝 {t.title}", t.id) for t in tests],
|
||
"count": len(tests),
|
||
}
|
||
|
||
|
||
async def on_test_selected(_callback: CallbackQuery, _widget: Select, manager: DialogManager, item_id: str):
|
||
manager.dialog_data["selected_test_id"] = int(item_id)
|
||
await manager.switch_to(UserMenuSG.test_detail)
|
||
|
||
|
||
async def on_back_to_tests(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||
await manager.switch_to(UserMenuSG.available_tests)
|
||
|
||
|
||
@inject
|
||
async def get_test_detail(
|
||
dialog_manager: DialogManager,
|
||
test_repo: FromDishka[TestRepository],
|
||
attempt_repo: FromDishka[TestAttemptRepository],
|
||
user_dao: FromDishka[UserDAO],
|
||
**_kwargs
|
||
):
|
||
test_id = dialog_manager.dialog_data.get("selected_test_id")
|
||
user_id = dialog_manager.event.from_user.id
|
||
|
||
if not test_id:
|
||
return {"test_info": "❌ Тест не найден"}
|
||
|
||
test, questions = await test_repo.get_test_with_questions(test_id)
|
||
|
||
if not test:
|
||
return {"test_info": "❌ Тест не найден"}
|
||
|
||
user = await user_dao.get_by_id(user_id)
|
||
attempts = await attempt_repo.get_user_test_attempts(user_id, test_id)
|
||
finished_attempts = [a for a in attempts if a.finished_at]
|
||
|
||
password_str = f"🔒 Требуется пароль" if test.password else "🔓 Без пароля"
|
||
attempts_str = f"🔄 Попыток: {len(finished_attempts)}/{test.attempts}" if test.attempts else f"🔄 Попыток: {len(finished_attempts)}/♾️"
|
||
expires_str = f"📅 До {test.expires_at.strftime('%d.%m.%Y %H:%M')}" if test.expires_at else "📅 Без срока"
|
||
group_str = f"🎓 Для группы {test.for_group}" if test.for_group else "👥 Для всех"
|
||
|
||
test_info = (
|
||
f"<b>📝 {test.title}</b>\n\n"
|
||
f"<blockquote>{test.description or '—'}</blockquote>\n\n"
|
||
f"<b>Вопросов:</b> {len(questions)}\n"
|
||
f"{password_str}\n"
|
||
f"{attempts_str}\n"
|
||
f"{expires_str}\n"
|
||
f"{group_str}"
|
||
)
|
||
|
||
return {"test_info": test_info}
|
||
|
||
|
||
user_menu_dialog = Dialog(
|
||
Window(
|
||
Format("{user_info}"),
|
||
Column(
|
||
Button(Const("📝 Доступные тесты"), id="tests", on_click=on_tests_clicked),
|
||
Button(Const("📊 Мои результаты"), id="results", on_click=on_results_clicked),
|
||
),
|
||
Row(
|
||
Button(Const("✏️ Имя"), id="edit_name", on_click=on_edit_name_clicked),
|
||
Button(Const("🎓 Группа"), id="edit_group", on_click=on_edit_group_clicked),
|
||
),
|
||
state=UserMenuSG.main,
|
||
getter=get_user_data,
|
||
),
|
||
Window(
|
||
Format("<b>📝 Доступные тесты</b>\n\nВсего: {count}"),
|
||
ScrollingGroup(
|
||
Select(
|
||
Format("{item[0]}"),
|
||
id="test_select",
|
||
item_id_getter=lambda x: x[1],
|
||
items="tests",
|
||
on_click=on_test_selected,
|
||
),
|
||
id="tests_scroll",
|
||
width=1,
|
||
height=7,
|
||
),
|
||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_main),
|
||
state=UserMenuSG.available_tests,
|
||
getter=get_available_tests,
|
||
),
|
||
Window(
|
||
Format("{test_info}"),
|
||
Column(
|
||
Button(Const("▶️ Пройти тест"), id="start_test", on_click=on_start_test),
|
||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_tests),
|
||
),
|
||
state=UserMenuSG.test_detail,
|
||
getter=get_test_detail,
|
||
),
|
||
Window(
|
||
Const("<b>✏️ Изменение имени</b>\n\nВведите новое имя:"),
|
||
MessageInput(on_name_input),
|
||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_main),
|
||
state=UserMenuSG.edit_name,
|
||
),
|
||
Window(
|
||
Const("<b>🎓 Изменение группы</b>\n\nВыберите группу:"),
|
||
ScrollingGroup(
|
||
Select(
|
||
Format("{item[1]}"),
|
||
id="groups",
|
||
item_id_getter=lambda x: x[0],
|
||
items="groups",
|
||
on_click=on_group_selected,
|
||
),
|
||
id="groups_scroll",
|
||
width=2,
|
||
height=7,
|
||
),
|
||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_main),
|
||
state=UserMenuSG.edit_group,
|
||
getter=get_groups_data,
|
||
),
|
||
)
|