From 7d2a734b7d7d0c800c155ad4ebe40b9725dc3d11 Mon Sep 17 00:00:00 2001 From: kolo Date: Tue, 6 Jan 2026 19:35:49 +0300 Subject: [PATCH 1/4] update --- .../b1c2d3e4f5a6_add_time_limit_to_test.py | 26 ++++++++ config.example.toml | 13 ++-- .../bot/shared_dialogs/create_test.py | 43 ++++++++++++- .../application/bot/shared_dialogs/states.py | 2 + .../bot/shared_dialogs/templates.py | 7 +++ .../application/bot/shared_dialogs/tests.py | 62 +++++++++++++++++++ .../application/bot/user_dialogs/deeplink.py | 4 +- .../application/bot/user_dialogs/main_menu.py | 2 + src/quizzi/domain/schemas.py | 1 + src/quizzi/domain/test_parser.py | 3 + .../infrastructure/database/dao/test.py | 6 ++ .../infrastructure/database/dto/test.py | 1 + src/quizzi/infrastructure/database/models.py | 1 + 13 files changed, 162 insertions(+), 9 deletions(-) create mode 100644 alembic/versions/b1c2d3e4f5a6_add_time_limit_to_test.py diff --git a/alembic/versions/b1c2d3e4f5a6_add_time_limit_to_test.py b/alembic/versions/b1c2d3e4f5a6_add_time_limit_to_test.py new file mode 100644 index 0000000..ff7e4b1 --- /dev/null +++ b/alembic/versions/b1c2d3e4f5a6_add_time_limit_to_test.py @@ -0,0 +1,26 @@ +"""add time_limit to test + +Revision ID: b1c2d3e4f5a6 +Revises: ca107b03ddf8 +Create Date: 2026-01-06 12:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b1c2d3e4f5a6' +down_revision: Union[str, None] = 'ca107b03ddf8' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('tests', sa.Column('time_limit', sa.Integer(), nullable=True)) + + +def downgrade() -> None: + op.drop_column('tests', 'time_limit') diff --git a/config.example.toml b/config.example.toml index 3600c62..96497a4 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,14 +1,13 @@ [bot] -token = "1234567890:ABCdefGHIjklMNOpqrsTUVwxyz" -creator_id = 123456789 +token = "1234567890" +creator_id = 1234567890 [security] -test_hash_salt = "your_secret_salt_here_change_in_production" -test_hash_length = 16 +encode_key = "encode_key" [database] host = "localhost" port = 5432 -user = "trudex_user" -password = "secure_password" -database = "trudex_db" +user = "postgres" +password = "passkey" +database = "trudex" diff --git a/src/quizzi/application/bot/shared_dialogs/create_test.py b/src/quizzi/application/bot/shared_dialogs/create_test.py index 268030f..1ffcdde 100644 --- a/src/quizzi/application/bot/shared_dialogs/create_test.py +++ b/src/quizzi/application/bot/shared_dialogs/create_test.py @@ -100,11 +100,41 @@ async def on_attempts_input(message: Message, _widget: MessageInput, manager: Di return manager.dialog_data["attempts"] = attempts - await manager.switch_to(SharedCreateTestSG.input_expires_at) + await manager.switch_to(SharedCreateTestSG.input_time_limit) async def on_skip_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager): manager.dialog_data["attempts"] = None + await manager.switch_to(SharedCreateTestSG.input_time_limit) + + +async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager): + if not message.text: + await message.answer("❌ Лимит времени не может быть пустым") + return + + time_limit_str = message.text.strip() + + if not time_limit_str.isdigit(): + await message.answer("❌ Лимит времени должен быть числом (в минутах)") + return + + time_limit_minutes = int(time_limit_str) + + if time_limit_minutes < 1: + await message.answer("❌ Лимит времени должен быть больше 0") + return + + if time_limit_minutes > 1440: + await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)") + return + + manager.dialog_data["time_limit"] = time_limit_minutes * 60 + await manager.switch_to(SharedCreateTestSG.input_expires_at) + + +async def on_skip_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager): + manager.dialog_data["time_limit"] = None await manager.switch_to(SharedCreateTestSG.input_expires_at) @@ -139,11 +169,13 @@ async def get_test_info(dialog_manager: DialogManager, **_kwargs): description = dialog_manager.dialog_data.get("description", "—") password = dialog_manager.dialog_data.get("password") attempts = dialog_manager.dialog_data.get("attempts") + time_limit = dialog_manager.dialog_data.get("time_limit") expires_at = dialog_manager.dialog_data.get("expires_at") for_group = dialog_manager.dialog_data.get("for_group") password_str = f"🔒 {password}" if password else "Без пароля" attempts_str = f"🔄 {attempts}" if attempts else "♾️ Без ограничений" + time_limit_str = f"⏱️ {time_limit // 60} мин" if time_limit else "⏱️ Без лимита" expires_at_msk = to_msk(expires_at) expires_str = expires_at_msk.strftime("%d.%m.%Y") if expires_at_msk else "Без срока" group_str = str(for_group) if for_group else "Для всех" @@ -155,6 +187,7 @@ async def get_test_info(dialog_manager: DialogManager, **_kwargs): f"Описание: {description}\n" f"Пароль: {password_str}\n" f"Попыток: {attempts_str}\n" + f"Время: {time_limit_str}\n" f"Истекает: {expires_str}\n" f"Для группы: {group_str}" ) @@ -168,6 +201,7 @@ async def on_confirm_test(_callback: CallbackQuery, _button: Button, manager: Di description = manager.dialog_data.get("description") password = manager.dialog_data.get("password") attempts = manager.dialog_data.get("attempts") + time_limit = manager.dialog_data.get("time_limit") expires_at = manager.dialog_data.get("expires_at") for_group = manager.dialog_data.get("for_group") @@ -176,6 +210,7 @@ async def on_confirm_test(_callback: CallbackQuery, _button: Button, manager: Di description=description, password=password, attempts=attempts, + time_limit=time_limit, expires_at=expires_at, for_group=for_group, ) @@ -470,6 +505,12 @@ shared_create_test_dialog = Dialog( Button(Const("⏭️ Без ограничений"), id="skip_attempts", on_click=on_skip_attempts), state=SharedCreateTestSG.input_attempts, ), + Window( + Const("⏱️ Лимит времени\n\n🔢 Введите лимит времени в минутах (1-1440) или пропустите для неограниченного времени:"), + MessageInput(on_time_limit_input), + Button(Const("⏭️ Без лимита"), id="skip_time_limit", on_click=on_skip_time_limit), + state=SharedCreateTestSG.input_time_limit, + ), Window( Const("📅 Срок действия\n\n🗓 Выберите дату истечения теста или пропустите:"), Calendar(id="calendar", on_click=on_date_selected), diff --git a/src/quizzi/application/bot/shared_dialogs/states.py b/src/quizzi/application/bot/shared_dialogs/states.py index 3f6255c..3a8e41e 100644 --- a/src/quizzi/application/bot/shared_dialogs/states.py +++ b/src/quizzi/application/bot/shared_dialogs/states.py @@ -15,6 +15,7 @@ class SharedTestsSG(StatesGroup): edit_menu = State() edit_password = State() edit_attempts = State() + edit_time_limit = State() edit_group = State() edit_expires = State() statistics = State() @@ -38,6 +39,7 @@ class SharedCreateTestSG(StatesGroup): input_description = State() input_password = State() input_attempts = State() + input_time_limit = State() input_expires_at = State() input_for_group = State() confirm_test_info = State() diff --git a/src/quizzi/application/bot/shared_dialogs/templates.py b/src/quizzi/application/bot/shared_dialogs/templates.py index 2be5ec3..0f4699e 100644 --- a/src/quizzi/application/bot/shared_dialogs/templates.py +++ b/src/quizzi/application/bot/shared_dialogs/templates.py @@ -36,6 +36,7 @@ SPEC_INFO = """📋 Спецификация формата JSON "description": "Описание теста", "password": null, "attempts": null, + "time_limit": null, "expires_at": null, "for_group": null, "questions": [...] @@ -46,6 +47,7 @@ SPEC_INFO = """📋 Спецификация формата JSONdescription — описание (до 2000 символов) • password — пароль для доступа или nullattempts — лимит попыток (1-100) или null +• time_limit — лимит времени в секундах (1-86400) или nullexpires_at — срок действия в ISO формате или nullfor_group — номер группы или null для всех @@ -90,6 +92,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════ // ⚙️ НАСТРОЙКИ: // • Пароль: test2024 // • Попыток: 5 +// • Лимит времени: 1800 секунд (30 минут) // • Срок действия: 31 декабря 2026, 23:59 // • Для группы: 2024 (или null для всех) // @@ -104,6 +107,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════ // 💡 ПОДСКАЗКИ: // • null означает "не задано" / "без ограничений" // • expires_at в формате ISO 8601: YYYY-MM-DDTHH:MM:SS +// • time_limit в секундах (1-86400), null для без ограничений // • for_group - номер группы или null для всех пользователей // • image_url - URL изображения к вопросу (опционально) // @@ -114,6 +118,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════ "description": "Полная демонстрация всех возможностей формата: разные типы вопросов, настройки доступа, ограничения по времени и группам", "password": "test2024", "attempts": 5, + "time_limit": 1800, "expires_at": "2026-12-31T23:59:59", "for_group": 2024, "questions": [ @@ -225,6 +230,7 @@ async def on_test_selected_for_export( "description": test.description, "password": test.password, "attempts": test.attempts, + "time_limit": test.time_limit, "expires_at": test.expires_at.isoformat() if test.expires_at else None, "for_group": test.for_group, "questions": [], @@ -340,6 +346,7 @@ async def create_test_from_parsed( description=parsed.description, password=parsed.password, attempts=parsed.attempts, + time_limit=parsed.time_limit, expires_at=parsed.expires_at, for_group=parsed.for_group, is_active=False, diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py index 93a45ca..14c7e2c 100644 --- a/src/quizzi/application/bot/shared_dialogs/tests.py +++ b/src/quizzi/application/bot/shared_dialogs/tests.py @@ -68,6 +68,7 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T status = "🟢 Активен" if test.is_active else "🔴 Деактивирован" password_str = f"🔒 {test.password}" if test.password else "🔓 Без пароля" attempts_str = f"🔄 {test.attempts}" if test.attempts else "♾️ Без ограничений" + time_limit_str = f"⏱️ {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита" expires_str = f"📅 {to_msk(test.expires_at).strftime('%d.%m.%Y %H:%M')}" if test.expires_at else "📅 Без срока" group_str = f"🎓 Группа {test.for_group}" if test.for_group else "👥 Для всех" results_str = "👁 Результаты видны" if test.are_results_viewable else "🔒 Результаты скрыты" @@ -80,6 +81,7 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T f"Вопросов: {questions_count}\n" f"Пароль: {password_str}\n" f"Попытки: {attempts_str}\n" + f"Время: {time_limit_str}\n" f"Срок: {expires_str}\n" f"Группа: {group_str}\n" f"Видимость: {results_str}\n\n" @@ -254,6 +256,7 @@ async def on_export_test( "description": test.description, "password": test.password, "attempts": test.attempts, + "time_limit": test.time_limit, "expires_at": test.expires_at.isoformat() if test.expires_at else None, "for_group": test.for_group, "questions": [], @@ -361,6 +364,10 @@ async def on_edit_attempts(_callback: CallbackQuery, _button: Button, manager: D await manager.switch_to(SharedTestsSG.edit_attempts) +async def on_edit_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.switch_to(SharedTestsSG.edit_time_limit) + + async def on_edit_group(_callback: CallbackQuery, _button: Button, manager: DialogManager): await manager.switch_to(SharedTestsSG.edit_group) @@ -446,6 +453,51 @@ async def on_remove_attempts(_callback: CallbackQuery, _button: Button, manager: await manager.switch_to(SharedTestsSG.test_detail) +@inject +async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): + test_id = manager.dialog_data.get("selected_test_id") + if not test_id: + await message.answer("❌ Тест не найден") + return + + if not message.text: + await message.answer("❌ Лимит времени не может быть пустым") + return + + time_limit_str = message.text.strip() + + if not time_limit_str.isdigit(): + await message.answer("❌ Лимит времени должен быть числом (в минутах)") + return + + time_limit_minutes = int(time_limit_str) + + if time_limit_minutes < 1: + await message.answer("❌ Лимит времени должен быть больше 0") + return + + if time_limit_minutes > 1440: + await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)") + return + + time_limit_seconds = time_limit_minutes * 60 + await test_dao.update(test_id, time_limit=time_limit_seconds) + await message.answer("✅ Лимит времени обновлен") + await manager.switch_to(SharedTestsSG.test_detail) + + +@inject +async def on_remove_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): + test_id = manager.dialog_data.get("selected_test_id") + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + await test_dao.update(test_id, time_limit=None) + await _callback.answer("✅ Лимит времени удален") + await manager.switch_to(SharedTestsSG.test_detail) + + @inject async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs): groups = await group_dao.get_all() @@ -561,6 +613,7 @@ shared_tests_dialog = Dialog( Column( Button(Const("🔑 Пароль"), id="edit_password", on_click=on_edit_password), Button(Const("🔄 Попытки"), id="edit_attempts", on_click=on_edit_attempts), + Button(Const("⏱️ Лимит времени"), id="edit_time_limit", on_click=on_edit_time_limit), Button(Const("👥 Группа"), id="edit_group", on_click=on_edit_group), Button(Const("📅 Срок действия"), id="edit_expires", on_click=on_edit_expires), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail), @@ -585,6 +638,15 @@ shared_tests_dialog = Dialog( ), state=SharedTestsSG.edit_attempts, ), + Window( + Const("⏱️ Изменение лимита времени\n\n🔢 Введите лимит времени в минутах (1-1440) или удалите ограничение:"), + MessageInput(on_time_limit_input), + Column( + Button(Const("🗑 Без лимита"), id="remove_time_limit", on_click=on_remove_time_limit), + Button(Const("◀️ Назад"), id="back", on_click=on_back_to_edit_menu), + ), + state=SharedTestsSG.edit_time_limit, + ), Window( Const("👥 Изменение группы\n\n🎓 Выберите группу или удалите привязку:"), ScrollingGroup( diff --git a/src/quizzi/application/bot/user_dialogs/deeplink.py b/src/quizzi/application/bot/user_dialogs/deeplink.py index 74dcb61..30e1d3e 100644 --- a/src/quizzi/application/bot/user_dialogs/deeplink.py +++ b/src/quizzi/application/bot/user_dialogs/deeplink.py @@ -41,13 +41,15 @@ async def get_deeplink_test_data( password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля" attempts_str = f"🔄 Попыток: {test.attempts}" if test.attempts else "🔄 Попыток: ♾️" + time_limit_str = f"⏱️ Время: {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита" test_info = ( f"📝 {test.title}\n\n" f"
{test.description or '—'}
\n\n" f"Вопросов: {questions_count}\n" f"{password_str}\n" - f"{attempts_str}" + f"{attempts_str}\n" + f"{time_limit_str}" ) return {"test_info": test_info, "can_start": True, "has_password": bool(test.password)} diff --git a/src/quizzi/application/bot/user_dialogs/main_menu.py b/src/quizzi/application/bot/user_dialogs/main_menu.py index 08de0fb..5bc8895 100644 --- a/src/quizzi/application/bot/user_dialogs/main_menu.py +++ b/src/quizzi/application/bot/user_dialogs/main_menu.py @@ -278,6 +278,7 @@ async def get_test_detail( password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля" attempts_str = f"🔄 Попыток: {len(finished_attempts)}/{test.attempts}" if test.attempts else f"🔄 Попыток: {len(finished_attempts)}/♾️" + time_limit_str = f"⏱️ Время: {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита" expires_at_msk = to_msk(test.expires_at) expires_str = f"📅 До {expires_at_msk.strftime('%d.%m.%Y %H:%M')}" if expires_at_msk else "📅 Без срока" @@ -289,6 +290,7 @@ async def get_test_detail( f"Вопросов: {len(questions)}\n" f"{password_str}\n" f"{attempts_str}\n" + f"{time_limit_str}\n" f"{expires_str}\n" f"{group_str}" ) diff --git a/src/quizzi/domain/schemas.py b/src/quizzi/domain/schemas.py index 74db228..9573300 100644 --- a/src/quizzi/domain/schemas.py +++ b/src/quizzi/domain/schemas.py @@ -41,6 +41,7 @@ class Test: password: str | None = None expires_at: datetime | None = None attempts: int | None = None + time_limit: int | None = None is_active: bool = True are_results_viewable: bool = False created_at: datetime | None = None diff --git a/src/quizzi/domain/test_parser.py b/src/quizzi/domain/test_parser.py index fa72f6f..7a7df35 100644 --- a/src/quizzi/domain/test_parser.py +++ b/src/quizzi/domain/test_parser.py @@ -24,6 +24,7 @@ class ParsedTest: description: str | None password: str | None attempts: int | None + time_limit: int | None expires_at: datetime | None for_group: int | None questions: list[ParsedQuestion] @@ -53,6 +54,7 @@ class TestParser: description = self._parse_string(data, "description", required=False, max_length=2000, errors=errors) password = self._parse_string(data, "password", required=False, max_length=255, errors=errors) attempts = self._parse_int(data, "attempts", required=False, min_val=1, max_val=100, errors=errors) + time_limit = self._parse_int(data, "time_limit", required=False, min_val=1, max_val=86400, errors=errors) expires_at = self._parse_datetime(data, "expires_at", required=False, errors=errors) for_group = self._parse_int(data, "for_group", required=False, errors=errors) @@ -68,6 +70,7 @@ class TestParser: description=description, password=password, attempts=attempts, + time_limit=time_limit, expires_at=expires_at, for_group=for_group, questions=questions, diff --git a/src/quizzi/infrastructure/database/dao/test.py b/src/quizzi/infrastructure/database/dao/test.py index 5917259..eeec9ab 100644 --- a/src/quizzi/infrastructure/database/dao/test.py +++ b/src/quizzi/infrastructure/database/dao/test.py @@ -24,6 +24,7 @@ class TestUpdateFields(TypedDict, total=False): password: str | None expires_at: datetime | None attempts: int | None + time_limit: int | None is_active: bool are_results_viewable: bool @@ -64,6 +65,7 @@ class TestDAO: password: str | None = None, expires_at: datetime | None = None, attempts: int | None = None, + time_limit: int | None = None, is_active: bool = True, are_results_viewable: bool = False, ) -> DomainTest: @@ -74,6 +76,7 @@ class TestDAO: password=password, expires_at=expires_at, attempts=attempts, + time_limit=time_limit, is_active=is_active, are_results_viewable=are_results_viewable, ) @@ -91,6 +94,7 @@ class TestDAO: password: str | None | _UNSET = UNSET, expires_at: datetime | None | _UNSET = UNSET, attempts: int | None | _UNSET = UNSET, + time_limit: int | None | _UNSET = UNSET, is_active: bool | _UNSET = UNSET, are_results_viewable: bool | _UNSET = UNSET, ) -> DomainTest | None: @@ -113,6 +117,8 @@ class TestDAO: test.expires_at = expires_at if not isinstance(attempts, _UNSET): test.attempts = attempts + if not isinstance(time_limit, _UNSET): + test.time_limit = time_limit if not isinstance(is_active, _UNSET): test.is_active = is_active if not isinstance(are_results_viewable, _UNSET): diff --git a/src/quizzi/infrastructure/database/dto/test.py b/src/quizzi/infrastructure/database/dto/test.py index 796ff30..cb6edd0 100644 --- a/src/quizzi/infrastructure/database/dto/test.py +++ b/src/quizzi/infrastructure/database/dto/test.py @@ -15,6 +15,7 @@ class TestDTO: password=self.model.password, expires_at=self.model.expires_at, attempts=self.model.attempts, + time_limit=self.model.time_limit, is_active=self.model.is_active, are_results_viewable=self.model.are_results_viewable, created_at=self.model.created_at, diff --git a/src/quizzi/infrastructure/database/models.py b/src/quizzi/infrastructure/database/models.py index cef9774..af8dc9c 100644 --- a/src/quizzi/infrastructure/database/models.py +++ b/src/quizzi/infrastructure/database/models.py @@ -52,6 +52,7 @@ class Test(Base): password: Mapped[str | None] = mapped_column(String(255), default=None) expires_at: Mapped[datetime | None] = mapped_column(default=None) attempts: Mapped[int | None] = mapped_column(Integer, default=None) + time_limit: Mapped[int | None] = mapped_column(Integer, default=None) is_active: Mapped[bool] = mapped_column(default=True) are_results_viewable: Mapped[bool] = mapped_column(default=False) created_at: Mapped[datetime] = mapped_column(server_default=func.now()) From ebdc9954de556ce792f990cf82f0127a32df4201 Mon Sep 17 00:00:00 2001 From: kolo Date: Wed, 7 Jan 2026 00:10:25 +0300 Subject: [PATCH 2/4] update --- ...8b9_add_warning_sent_at_to_test_attempt.py | 24 ++ .../application/bot/user_dialogs/deeplink.py | 19 +- .../application/bot/user_dialogs/states.py | 2 + .../application/bot/user_dialogs/take_test.py | 380 ++++++++++++++++-- src/quizzi/domain/schemas.py | 1 + .../infrastructure/database/dao/test.py | 1 - .../database/dao/test_attempt.py | 3 + .../infrastructure/database/dao/user.py | 1 - .../database/dto/test_attempt.py | 1 + src/quizzi/infrastructure/database/models.py | 1 + .../infrastructure/database/repo/test.py | 1 - .../database/repo/test_attempt.py | 63 ++- src/quizzi/infrastructure/di.py | 21 +- src/quizzi/infrastructure/scheduling/tasks.py | 123 ++++++ src/quizzi/infrastructure/utils/timezone.py | 1 - 15 files changed, 600 insertions(+), 42 deletions(-) create mode 100644 alembic/versions/c4d5e6f7a8b9_add_warning_sent_at_to_test_attempt.py diff --git a/alembic/versions/c4d5e6f7a8b9_add_warning_sent_at_to_test_attempt.py b/alembic/versions/c4d5e6f7a8b9_add_warning_sent_at_to_test_attempt.py new file mode 100644 index 0000000..92fe672 --- /dev/null +++ b/alembic/versions/c4d5e6f7a8b9_add_warning_sent_at_to_test_attempt.py @@ -0,0 +1,24 @@ +"""add warning_sent_at to test_attempt + +Revision ID: c4d5e6f7a8b9 +Revises: b1c2d3e4f5a6 +Create Date: 2026-01-06 + +""" +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = 'c4d5e6f7a8b9' +down_revision: str | None = 'b1c2d3e4f5a6' +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column('test_attempts', sa.Column('warning_sent_at', sa.DateTime(), nullable=True)) + + +def downgrade() -> None: + op.drop_column('test_attempts', 'warning_sent_at') diff --git a/src/quizzi/application/bot/user_dialogs/deeplink.py b/src/quizzi/application/bot/user_dialogs/deeplink.py index 30e1d3e..93f9ae4 100644 --- a/src/quizzi/application/bot/user_dialogs/deeplink.py +++ b/src/quizzi/application/bot/user_dialogs/deeplink.py @@ -12,6 +12,7 @@ from quizzi.infrastructure.database.models import QuestionType from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter +from quizzi.infrastructure.utils.timezone import now_msk_naive @inject @@ -92,15 +93,22 @@ async def on_start_deeplink_test( if active_attempt: await attempt_repo.attempt_dao.delete(active_attempt.id) - if test.password: + if test.time_limit: + await manager.start(UserTestSG.confirm_time_limit, mode=StartMode.NORMAL, data={ + "test_id": test_id, + "time_limit": test.time_limit, + "has_password": bool(test.password), + }) + elif test.password: allowed, wait_time = await rate_limiter.check(user_id) if not allowed: minutes = int(wait_time // 60) + 1 await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True) return + manager.dialog_data["time_limit"] = None await manager.switch_to(UserDeeplinkSG.password_input) else: - await start_test_without_password(manager, test_repo, attempt_repo, test_id, user_id) + await start_test_without_password(manager, test_repo, attempt_repo, test_id, user_id, None) async def start_test_without_password( @@ -109,6 +117,7 @@ async def start_test_without_password( attempt_repo: TestAttemptRepository, test_id: int, user_id: int, + time_limit: int | None = None, ): _, questions = await test_repo.get_test_with_questions(test_id) @@ -116,6 +125,7 @@ async def start_test_without_password( return attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) + started_at = now_msk_naive() first_question, _ = await test_repo.get_question_with_options(questions[0].id) @@ -138,6 +148,8 @@ async def start_test_without_password( "questions": [q.id for q in questions], "current_question_index": 0, "user_answers": {}, + "time_limit": time_limit, + "started_at": started_at.isoformat(), } ) @@ -170,8 +182,9 @@ async def on_deeplink_password_input( if message.text and message.text.strip() == test.password: await message.answer("✅ Пароль верный") + time_limit = manager.dialog_data.get("time_limit") await start_test_without_password( - manager, test_repo, attempt_repo, test_id, message.from_user.id + manager, test_repo, attempt_repo, test_id, message.from_user.id, time_limit ) else: allowed, wait_time = await rate_limiter.check(message.from_user.id) diff --git a/src/quizzi/application/bot/user_dialogs/states.py b/src/quizzi/application/bot/user_dialogs/states.py index 9f77f72..beb550c 100644 --- a/src/quizzi/application/bot/user_dialogs/states.py +++ b/src/quizzi/application/bot/user_dialogs/states.py @@ -13,11 +13,13 @@ class UserMenuSG(StatesGroup): class UserTestSG(StatesGroup): password_input = State() + confirm_time_limit = State() question_single = State() question_multiple = State() question_input = State() results = State() detailed_results = State() + time_expired = State() class UserDeeplinkSG(StatesGroup): diff --git a/src/quizzi/application/bot/user_dialogs/take_test.py b/src/quizzi/application/bot/user_dialogs/take_test.py index 6052234..807d14e 100644 --- a/src/quizzi/application/bot/user_dialogs/take_test.py +++ b/src/quizzi/application/bot/user_dialogs/take_test.py @@ -1,9 +1,11 @@ +from datetime import datetime + from aiogram.enums import ContentType as AiogramContentType from aiogram.types import CallbackQuery, Message from aiogram_dialog import Dialog, DialogManager, StartMode, Window from aiogram_dialog.api.entities import MediaAttachment, MediaId from aiogram_dialog.widgets.input import MessageInput -from aiogram_dialog.widgets.kbd import Button, Column, Multiselect, Radio +from aiogram_dialog.widgets.kbd import Button, Column, Multiselect, Radio, Row from aiogram_dialog.widgets.media import DynamicMedia from aiogram_dialog.widgets.text import Const, Format from dishka import FromDishka @@ -28,6 +30,145 @@ async def get_state_for_question_type(question_type: str): return UserTestSG.question_input +def get_remaining_time(started_at: datetime, time_limit: int) -> int | None: + if not time_limit: + return None + elapsed = (now_msk_naive() - started_at).total_seconds() + remaining = time_limit - elapsed + return max(0, int(remaining)) + + +def format_time(seconds: int) -> str: + if seconds >= 3600: + hours = seconds // 3600 + minutes = (seconds % 3600) // 60 + secs = seconds % 60 + return f"{hours}:{minutes:02d}:{secs:02d}" + else: + minutes = seconds // 60 + secs = seconds % 60 + return f"{minutes}:{secs:02d}" + + +def is_time_expired(started_at: datetime, time_limit: int | None) -> bool: + if not time_limit: + return False + remaining = get_remaining_time(started_at, time_limit) + return remaining is not None and remaining <= 0 + + +async def finish_test_by_timeout( + manager: DialogManager, + attempt_repo: TestAttemptRepository, + answer_dao: UserAnswerDAO, + test_repo: TestRepository, + attempt_id: int, + questions: list[int], + user_answers: dict, + are_results_viewable: bool = False, +): + answered_question_ids = set() + answers = await attempt_repo.get_answers_for_attempt(attempt_id) + for answer in answers: + answered_question_ids.add(answer.question_id) + + for question_id in questions: + if question_id in answered_question_ids: + continue + + answer_data = user_answers.get(str(question_id)) + + if answer_data: + question, options = await test_repo.get_question_with_options(question_id) + if not question: + continue + + if answer_data["type"] == "single": + selected_option_id = answer_data["answer"] + correct_options = [opt for opt in options if opt.is_correct] + is_correct = any(opt.id == selected_option_id for opt in correct_options) + selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "") + + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + selected_option_id=selected_option_id, + text_answer=selected_text, + is_correct=is_correct, + ) + elif answer_data["type"] == "multiple": + selected_option_ids = set(answer_data["answer"]) + selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids]) + correct_texts = sorted([opt.text for opt in options if opt.is_correct]) + is_correct = selected_texts == correct_texts + + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer="|".join(selected_texts), + is_correct=is_correct, + ) + else: + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer=None, + is_correct=False, + ) + + correct_count = await attempt_repo.calculate_attempt_score(attempt_id) + total_questions = len(questions) + score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0 + is_passed = score >= 50 + + await attempt_repo.finish_attempt(attempt_id, score, is_passed) + + manager.dialog_data["score"] = score + manager.dialog_data["correct_count"] = correct_count + manager.dialog_data["total_questions"] = total_questions + manager.dialog_data["is_passed"] = is_passed + manager.dialog_data["are_results_viewable"] = are_results_viewable + manager.dialog_data["time_expired"] = True + + await manager.switch_to(UserTestSG.time_expired) + + +async def check_time_and_finish_if_expired( + manager: DialogManager, + test_dao: TestDAO, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + answer_dao: UserAnswerDAO, +) -> bool: + start_data = manager.start_data or {} + assert isinstance(start_data, dict) + + test_id = manager.dialog_data.get("test_id") or start_data.get("test_id") + attempt_id = manager.dialog_data.get("attempt_id") or start_data.get("attempt_id") + started_at_str = manager.dialog_data.get("started_at") or start_data.get("started_at") + time_limit = manager.dialog_data.get("time_limit") or start_data.get("time_limit") + + if not time_limit or not started_at_str or not attempt_id: + return False + + started_at = datetime.fromisoformat(started_at_str) if isinstance(started_at_str, str) else started_at_str + + if is_time_expired(started_at, time_limit): + questions = manager.dialog_data.get("questions") or start_data.get("questions", []) + user_answers = manager.dialog_data.get("user_answers", {}) + + test = await test_dao.get_by_id(test_id) if test_id else None + are_results_viewable = test.are_results_viewable if test else False + + await finish_test_by_timeout( + manager, attempt_repo, answer_dao, test_repo, + attempt_id, questions, user_answers, are_results_viewable + ) + return True + + return False + + @inject async def on_start_test( _callback: CallbackQuery, @@ -70,36 +211,121 @@ async def on_start_test( if active_attempt: await attempt_repo.attempt_dao.delete(active_attempt.id) - if test.password: + if test.time_limit: + await manager.start(UserTestSG.confirm_time_limit, mode=StartMode.NORMAL, data={ + "test_id": test_id, + "time_limit": test.time_limit, + "has_password": bool(test.password), + }) + elif test.password: allowed, wait_time = await rate_limiter.check(user_id) if not allowed: minutes = int(wait_time // 60) + 1 await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True) return - await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={"test_id": test_id}) + await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={ + "test_id": test_id, + "time_limit": None, + }) else: - _, questions = await test_repo.get_test_with_questions(test_id) - - if not questions: - await _callback.answer("❌ В тесте нет вопросов") - return - - attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) - - first_question, _ = await test_repo.get_question_with_options(questions[0].id) - first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) - - await manager.start( - first_state, - mode=StartMode.NORMAL, - data={ - "test_id": test_id, - "attempt_id": attempt.id, - "questions": [q.id for q in questions], - "current_question_index": 0, - "user_answers": {}, - } + await start_test_directly(manager, test_repo, attempt_repo, test_id, user_id, None) + + +async def get_confirm_time_limit_data(dialog_manager: DialogManager, **_kwargs): + start_data = dialog_manager.start_data or {} + assert isinstance(start_data, dict) + time_limit = start_data.get("time_limit", 0) + + minutes = time_limit // 60 + + return { + "time_limit_text": ( + f"⚠️ ВНИМАНИЕ!\n\n" + f"У этого теста установлен лимит времени:\n" + f"⏱️ {minutes} минут\n\n" + f"После начала теста таймер нельзя остановить.\n" + f"Если время закончится, тест будет автоматически завершён.\n\n" + f"Вы готовы начать?" ) + } + + +@inject +async def on_confirm_time_limit( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_dao: FromDishka[TestDAO], + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + rate_limiter: FromDishka[PasswordRateLimiter], +): + assert _callback.from_user is not None + start_data = manager.start_data or {} + assert isinstance(start_data, dict) + + test_id = start_data.get("test_id") + time_limit = start_data.get("time_limit") + has_password = start_data.get("has_password", False) + user_id = _callback.from_user.id + + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + if has_password: + test = await test_dao.get_by_id(test_id) + if test and test.password: + allowed, wait_time = await rate_limiter.check(user_id) + if not allowed: + minutes = int(wait_time // 60) + 1 + await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True) + return + await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={ + "test_id": test_id, + "time_limit": time_limit, + }) + return + + await start_test_directly(manager, test_repo, attempt_repo, test_id, user_id, time_limit) + + +async def on_cancel_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.done() + + +async def start_test_directly( + manager: DialogManager, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + test_id: int, + user_id: int, + time_limit: int | None, +): + _, questions = await test_repo.get_test_with_questions(test_id) + + if not questions: + return + + attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) + started_at = now_msk_naive() + + first_question, _ = await test_repo.get_question_with_options(questions[0].id) + first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) + + await manager.start( + first_state, + mode=StartMode.RESET_STACK, + data={ + "test_id": test_id, + "attempt_id": attempt.id, + "questions": [q.id for q in questions], + "current_question_index": 0, + "user_answers": {}, + "time_limit": time_limit, + "started_at": started_at.isoformat(), + } + ) @inject @@ -116,6 +342,7 @@ async def on_password_input( start_data = manager.start_data or {} assert isinstance(start_data, dict) test_id = start_data.get("test_id") + time_limit = start_data.get("time_limit") if not test_id: await message.answer("❌ Тест не найден") @@ -137,14 +364,18 @@ async def on_password_input( return attempt = await attempt_repo.attempt_dao.create(user_id=message.from_user.id, test_id=test_id) + started_at = now_msk_naive() first_question, _ = await test_repo.get_question_with_options(questions[0].id) first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) + manager.dialog_data["test_id"] = test_id manager.dialog_data["attempt_id"] = attempt.id manager.dialog_data["questions"] = [q.id for q in questions] manager.dialog_data["current_question_index"] = 0 manager.dialog_data["user_answers"] = {} + manager.dialog_data["time_limit"] = time_limit + manager.dialog_data["started_at"] = started_at.isoformat() await manager.switch_to(first_state) else: @@ -184,6 +415,23 @@ async def get_question_data( start_data = dialog_manager.start_data or {} assert isinstance(start_data, dict) + time_limit = dialog_manager.dialog_data.get("time_limit") or start_data.get("time_limit") + started_at_str = dialog_manager.dialog_data.get("started_at") or start_data.get("started_at") + + timer_str = "" + if time_limit and started_at_str: + started_at = datetime.fromisoformat(started_at_str) if isinstance(started_at_str, str) else started_at_str + remaining = get_remaining_time(started_at, time_limit) + if remaining is not None: + if remaining <= 0: + return { + "question_text": "⏰ Время истекло!", + "options": [], + "media": None, + "time_expired": True, + } + timer_str = f" ⏱️ {format_time(remaining)}" + current_index = dialog_manager.dialog_data.get("current_question_index") if current_index is None: current_index = start_data.get("current_question_index", 0) @@ -209,13 +457,26 @@ async def get_question_data( ) return { - "question_text": f"📝 Вопрос {progress}\n\n
{question.text}
", + "question_text": f"📝 Вопрос {progress}{timer_str}\n\n
{question.text}
", "options": [(opt.text, str(opt.id)) for opt in options], "media": media, } -async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str): +@inject +async def on_single_answer_selected( + _callback: CallbackQuery, + _widget, + manager: DialogManager, + item_id: str, + test_dao: FromDishka[TestDAO], + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + answer_dao: FromDishka[UserAnswerDAO], +): + if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao): + return + start_data = manager.start_data or {} assert isinstance(start_data, dict) current_index = manager.dialog_data.get("current_question_index") @@ -232,7 +493,20 @@ async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager: manager.dialog_data["user_answers"] = user_answers -async def on_multiple_answer_changed(_event, widget, manager: DialogManager, _data: str): +@inject +async def on_multiple_answer_changed( + _event, + widget, + manager: DialogManager, + _data: str, + test_dao: FromDishka[TestDAO], + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + answer_dao: FromDishka[UserAnswerDAO], +): + if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao): + return + start_data = manager.start_data or {} assert isinstance(start_data, dict) current_index = manager.dialog_data.get("current_question_index") @@ -261,6 +535,9 @@ async def on_text_answer_input( answer_dao: FromDishka[UserAnswerDAO], test_dao: FromDishka[TestDAO], ): + if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao): + return + start_data = manager.start_data or {} assert isinstance(start_data, dict) current_index = manager.dialog_data.get("current_question_index") @@ -321,6 +598,9 @@ async def on_next_question( answer_dao: FromDishka[UserAnswerDAO], test_dao: FromDishka[TestDAO], ): + if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao): + return + start_data = manager.start_data or {} assert isinstance(start_data, dict) current_index = manager.dialog_data.get("current_question_index") @@ -441,6 +721,29 @@ async def get_results_data(dialog_manager: DialogManager, **_kwargs): return {"results_text": results_text, "are_results_viewable": are_results_viewable} +async def get_time_expired_data(dialog_manager: DialogManager, **_kwargs): + score = dialog_manager.dialog_data.get("score", 0) + correct_count = dialog_manager.dialog_data.get("correct_count", 0) + total_questions = dialog_manager.dialog_data.get("total_questions", 0) + is_passed = dialog_manager.dialog_data.get("is_passed", False) + are_results_viewable = dialog_manager.dialog_data.get("are_results_viewable", False) + + if is_passed: + status = "✅ Тест пройден!" + else: + status = "❌ Тест не пройден" + + results_text = ( + f"⏰ Время истекло!\n\n" + f"{status}\n\n" + f"📊 Результат: {score}%\n" + f"✏️ Правильных ответов: {correct_count} из {total_questions}\n\n" + f"Неотвеченные вопросы засчитаны как неправильные." + ) + + return {"results_text": results_text, "are_results_viewable": are_results_viewable} + + async def on_back_to_menu(_callback: CallbackQuery, _button: Button, manager: DialogManager): await manager.start(UserMenuSG.main, mode=StartMode.RESET_STACK) @@ -494,6 +797,15 @@ async def get_detailed_results_data( take_test_dialog = Dialog( + Window( + Format("{time_limit_text}"), + Row( + Button(Const("✅ Начать"), id="confirm", on_click=on_confirm_time_limit), + Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_time_limit), + ), + state=UserTestSG.confirm_time_limit, + getter=get_confirm_time_limit_data, + ), Window( Const("🔑 Введите пароль для доступа к тесту:"), MessageInput(on_password_input), @@ -555,6 +867,20 @@ take_test_dialog = Dialog( state=UserTestSG.results, getter=get_results_data, ), + Window( + Format("{results_text}"), + Column( + Button( + Const("📋 Подробные результаты"), + id="detailed", + on_click=on_show_detailed_results, + when="are_results_viewable", + ), + Button(Const("◀️ В главное меню"), id="back", on_click=on_back_to_menu), + ), + state=UserTestSG.time_expired, + getter=get_time_expired_data, + ), Window( Format("{detailed_text}"), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_results), diff --git a/src/quizzi/domain/schemas.py b/src/quizzi/domain/schemas.py index 9573300..69bd577 100644 --- a/src/quizzi/domain/schemas.py +++ b/src/quizzi/domain/schemas.py @@ -74,6 +74,7 @@ class TestAttempt: test_id: int started_at: datetime finished_at: datetime | None = None + warning_sent_at: datetime | None = None score: int = 0 is_passed: bool = False diff --git a/src/quizzi/infrastructure/database/dao/test.py b/src/quizzi/infrastructure/database/dao/test.py index eeec9ab..1fbccf5 100644 --- a/src/quizzi/infrastructure/database/dao/test.py +++ b/src/quizzi/infrastructure/database/dao/test.py @@ -10,7 +10,6 @@ from quizzi.infrastructure.database.models import Test class _UNSET: - """Sentinel для различения None и "не передано".""" pass diff --git a/src/quizzi/infrastructure/database/dao/test_attempt.py b/src/quizzi/infrastructure/database/dao/test_attempt.py index f7f1ef8..a11de46 100644 --- a/src/quizzi/infrastructure/database/dao/test_attempt.py +++ b/src/quizzi/infrastructure/database/dao/test_attempt.py @@ -46,6 +46,7 @@ class TestAttemptDAO: self, attempt_id: int, finished_at: datetime | None = None, + warning_sent_at: datetime | None = None, score: int | None = None, is_passed: bool | None = None, ) -> DomainTestAttempt | None: @@ -58,6 +59,8 @@ class TestAttemptDAO: if finished_at is not None: attempt.finished_at = finished_at + if warning_sent_at is not None: + attempt.warning_sent_at = warning_sent_at if score is not None: attempt.score = score if is_passed is not None: diff --git a/src/quizzi/infrastructure/database/dao/user.py b/src/quizzi/infrastructure/database/dao/user.py index ab2bd0d..12c975f 100644 --- a/src/quizzi/infrastructure/database/dao/user.py +++ b/src/quizzi/infrastructure/database/dao/user.py @@ -9,7 +9,6 @@ from quizzi.infrastructure.database.models import User class _UNSET: - """Sentinel для различения None и "не передано".""" pass diff --git a/src/quizzi/infrastructure/database/dto/test_attempt.py b/src/quizzi/infrastructure/database/dto/test_attempt.py index 0cb01d9..e417ebe 100644 --- a/src/quizzi/infrastructure/database/dto/test_attempt.py +++ b/src/quizzi/infrastructure/database/dto/test_attempt.py @@ -13,6 +13,7 @@ class TestAttemptDTO: test_id=self.model.test_id, started_at=self.model.started_at, finished_at=self.model.finished_at, + warning_sent_at=self.model.warning_sent_at, score=self.model.score, is_passed=self.model.is_passed, ) diff --git a/src/quizzi/infrastructure/database/models.py b/src/quizzi/infrastructure/database/models.py index af8dc9c..897905f 100644 --- a/src/quizzi/infrastructure/database/models.py +++ b/src/quizzi/infrastructure/database/models.py @@ -105,6 +105,7 @@ class TestAttempt(Base): test_id: Mapped[int] = mapped_column(ForeignKey("tests.id"), index=True) started_at: Mapped[datetime] = mapped_column(server_default=func.now()) finished_at: Mapped[datetime | None] = mapped_column(default=None) + warning_sent_at: Mapped[datetime | None] = mapped_column(default=None) score: Mapped[int] = mapped_column(Integer, default=0) is_passed: Mapped[bool] = mapped_column(default=False) diff --git a/src/quizzi/infrastructure/database/repo/test.py b/src/quizzi/infrastructure/database/repo/test.py index e09b1ca..102904b 100644 --- a/src/quizzi/infrastructure/database/repo/test.py +++ b/src/quizzi/infrastructure/database/repo/test.py @@ -125,7 +125,6 @@ class TestRepository: async def get_questions_with_options_by_ids( self, question_ids: list[int] ) -> dict[int, tuple[Question, list[Option]]]: - """Загружает вопросы с опциями по списку ID за один запрос.""" if not question_ids: return {} diff --git a/src/quizzi/infrastructure/database/repo/test_attempt.py b/src/quizzi/infrastructure/database/repo/test_attempt.py index 1686ffb..a7e44d5 100644 --- a/src/quizzi/infrastructure/database/repo/test_attempt.py +++ b/src/quizzi/infrastructure/database/repo/test_attempt.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import final from sqlalchemy import func, select @@ -9,7 +10,10 @@ from quizzi.infrastructure.database.dao.test_attempt import TestAttemptDAO from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO from quizzi.infrastructure.database.dto.test_attempt import TestAttemptDTO from quizzi.infrastructure.database.dto.user_answer import UserAnswerDTO +from quizzi.infrastructure.database.models import Question as QuestionModel +from quizzi.infrastructure.database.models import Test as TestModel from quizzi.infrastructure.database.models import TestAttempt as TestAttemptModel +from quizzi.infrastructure.database.models import User as UserModel from quizzi.infrastructure.database.models import UserAnswer as UserAnswerModel from quizzi.infrastructure.utils.timezone import now_msk_naive @@ -173,8 +177,6 @@ class TestAttemptRepository: } async def get_most_difficult_questions(self, test_id: int, limit: int = 10) -> list[tuple[int, float]]: - from quizzi.infrastructure.database.models import Question as QuestionModel - result = await self.session.execute( select( UserAnswerModel.question_id, @@ -209,8 +211,6 @@ class TestAttemptRepository: } async def get_finished_attempts_with_tests(self, user_id: int) -> list[tuple[TestAttempt, str]]: - from quizzi.infrastructure.database.models import Test as TestModel - result = await self.session.execute( select(TestAttemptModel, TestModel.title) .join(TestModel, TestAttemptModel.test_id == TestModel.id) @@ -222,8 +222,6 @@ class TestAttemptRepository: return [(TestAttemptDTO(row[0]).to_domain(), row[1]) for row in rows] async def get_test_attempts_with_users(self, test_id: int) -> list[tuple[TestAttempt, str]]: - from quizzi.infrastructure.database.models import User as UserModel - result = await self.session.execute( select(TestAttemptModel, UserModel.name, UserModel.first_name) .join(UserModel, TestAttemptModel.user_id == UserModel.id) @@ -233,3 +231,56 @@ class TestAttemptRepository: ) rows = result.all() return [(TestAttemptDTO(row[0]).to_domain(), row[1] or row[2]) for row in rows] + + async def get_expired_active_attempts(self, now: datetime) -> list[tuple[TestAttempt, int]]: + result = await self.session.execute( + select(TestAttemptModel, TestModel.time_limit) + .join(TestModel, TestAttemptModel.test_id == TestModel.id) + .where(TestAttemptModel.finished_at.is_(None)) + .where(TestModel.time_limit.isnot(None)) + ) + rows = result.all() + + expired = [] + for attempt_model, time_limit in rows: + if time_limit: + elapsed = (now - attempt_model.started_at).total_seconds() + if elapsed >= time_limit: + expired.append((TestAttemptDTO(attempt_model).to_domain(), time_limit)) + + return expired + + async def get_attempts_needing_warning(self, now: datetime) -> list[tuple[TestAttempt, int, int]]: + result = await self.session.execute( + select( + TestAttemptModel, + TestModel.time_limit, + func.count(QuestionModel.id).label("questions_count") + ) + .join(TestModel, TestAttemptModel.test_id == TestModel.id) + .join(QuestionModel, QuestionModel.test_id == TestModel.id) + .where(TestAttemptModel.finished_at.is_(None)) + .where(TestAttemptModel.warning_sent_at.is_(None)) + .where(TestModel.time_limit.isnot(None)) + .group_by(TestAttemptModel.id, TestModel.time_limit) + ) + rows = result.all() + + needing_warning = [] + for attempt_model, time_limit, questions_count in rows: + if time_limit and questions_count > 0: + elapsed = (now - attempt_model.started_at).total_seconds() + time_remaining = time_limit - elapsed + threshold = time_limit * 0.1 + + if time_remaining <= threshold and time_remaining > 0: + needing_warning.append(( + TestAttemptDTO(attempt_model).to_domain(), + time_limit, + questions_count + )) + + return needing_warning + + async def mark_warning_sent(self, attempt_id: int, sent_at: datetime) -> None: + await self.attempt_dao.update(attempt_id=attempt_id, warning_sent_at=sent_at) diff --git a/src/quizzi/infrastructure/di.py b/src/quizzi/infrastructure/di.py index 6a8053b..3d6d39a 100644 --- a/src/quizzi/infrastructure/di.py +++ b/src/quizzi/infrastructure/di.py @@ -1,6 +1,7 @@ import logging from collections.abc import AsyncIterable +from aiogram import Bot from apscheduler.schedulers.asyncio import AsyncIOScheduler from dishka import AsyncContainer, Provider, Scope, provide from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker @@ -16,7 +17,7 @@ from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository from quizzi.infrastructure.database.repo.user import UserRepository -from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests +from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications from quizzi.infrastructure.utils.config import Config from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter @@ -81,7 +82,7 @@ class DatabaseProvider(Provider): class SchedulerProvider(Provider): @provide(scope = Scope.APP) - def get_scheduler(self, container: AsyncContainer) -> AsyncIOScheduler: + def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler: logging.getLogger('apscheduler').setLevel(logging.WARNING) scheduler = AsyncIOScheduler() @@ -93,4 +94,20 @@ class SchedulerProvider(Provider): id='deactivate_expired_tests', ) + scheduler.add_job( + finish_expired_test_attempts, + 'interval', + minutes=1, + args=[container, bot], + id='finish_expired_test_attempts', + ) + + scheduler.add_job( + send_time_warning_notifications, + 'interval', + seconds=10, + args=[container, bot], + id='send_time_warning_notifications', + ) + return scheduler diff --git a/src/quizzi/infrastructure/scheduling/tasks.py b/src/quizzi/infrastructure/scheduling/tasks.py index da94a34..0236ebd 100644 --- a/src/quizzi/infrastructure/scheduling/tasks.py +++ b/src/quizzi/infrastructure/scheduling/tasks.py @@ -1,8 +1,14 @@ import logging +from aiogram import Bot +from aiogram.exceptions import TelegramAPIError from dishka import AsyncContainer +from sqlalchemy.exc import SQLAlchemyError from quizzi.infrastructure.database.dao.test import TestDAO +from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO +from quizzi.infrastructure.database.repo.test import TestRepository +from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository from quizzi.infrastructure.utils.timezone import now_msk_naive logger = logging.getLogger(__name__) @@ -17,3 +23,120 @@ async def deactivate_expired_tests(container: AsyncContainer) -> None: for test in expired_tests: await test_dao.update(test.id, is_active=False) logger.info("Деактивирован истёкший тест: id=%d, title=%s", test.id, test.title) + + +async def finish_expired_test_attempts(container: AsyncContainer, bot: Bot) -> None: + async with container() as request_container: + attempt_repo = await request_container.get(TestAttemptRepository) + test_repo = await request_container.get(TestRepository) + answer_dao = await request_container.get(UserAnswerDAO) + + now = now_msk_naive() + expired_attempts = await attempt_repo.get_expired_active_attempts(now) + + for attempt, _ in expired_attempts: + try: + test, questions_with_options = await test_repo.get_full_test(attempt.test_id) + if not test: + continue + + question_ids = [q.id for q, _ in questions_with_options] + + answered_question_ids = set() + answers = await attempt_repo.get_answers_for_attempt(attempt.id) + for answer in answers: + answered_question_ids.add(answer.question_id) + + for question_id in question_ids: + if question_id not in answered_question_ids: + await answer_dao.create( + attempt_id=attempt.id, + question_id=question_id, + text_answer=None, + is_correct=False, + ) + + correct_count = await attempt_repo.calculate_attempt_score(attempt.id) + total_questions = len(question_ids) + score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0 + is_passed = score >= 50 + + await attempt_repo.finish_attempt(attempt.id, score, is_passed) + + status = "пройден ✅" if is_passed else "не пройден ❌" + + try: + await bot.send_message( + attempt.user_id, + f"⏰ Время на прохождение теста истекло!\n\n" + f"📝 Тест: {test.title}\n" + f"📊 Результат: {score}%\n" + f"🏆 Статус: {status}\n\n" + f"Неотвеченные вопросы засчитаны как неправильные." + ) + except TelegramAPIError as e: + logger.warning("Не удалось отправить уведомление пользователю %d: %s", attempt.user_id, e) + + logger.info( + "Завершена просроченная попытка: attempt_id=%d, user_id=%d, test_id=%d, score=%d%%", + attempt.id, attempt.user_id, attempt.test_id, score + ) + + except SQLAlchemyError as e: + logger.error("Ошибка при завершении попытки %d: %s", attempt.id, e) + + +async def send_time_warning_notifications(container: AsyncContainer, bot: Bot) -> None: + async with container() as request_container: + attempt_repo = await request_container.get(TestAttemptRepository) + test_repo = await request_container.get(TestRepository) + + now = now_msk_naive() + attempts_needing_warning = await attempt_repo.get_attempts_needing_warning(now) + + for attempt, time_limit, questions_count in attempts_needing_warning: + try: + answers = await attempt_repo.get_answers_for_attempt(attempt.id) + + if answers: + avg_time_per_question = time_limit / questions_count + time_since_start = (now - attempt.started_at).total_seconds() + expected_answers = int(time_since_start / avg_time_per_question) + + if len(answers) >= expected_answers: + continue + + test, _ = await test_repo.get_full_test(attempt.test_id) + if not test: + continue + + elapsed = (now - attempt.started_at).total_seconds() + remaining_seconds = int(time_limit - elapsed) + remaining_minutes = remaining_seconds // 60 + remaining_secs = remaining_seconds % 60 + + if remaining_minutes > 0: + time_str = f"{remaining_minutes} мин {remaining_secs} сек" + else: + time_str = f"{remaining_secs} сек" + + try: + await bot.send_message( + attempt.user_id, + f"⚠️ Внимание! Время заканчивается!\n\n" + f"📝 Тест: {test.title}\n" + f"⏱️ Осталось: {time_str}\n\n" + f"Поторопитесь с ответами!" + ) + except TelegramAPIError as e: + logger.warning("Не удалось отправить предупреждение пользователю %d: %s", attempt.user_id, e) + + await attempt_repo.mark_warning_sent(attempt.id, now) + + logger.info( + "Отправлено предупреждение о времени: attempt_id=%d, user_id=%d, remaining=%ds", + attempt.id, attempt.user_id, remaining_seconds + ) + + except SQLAlchemyError as e: + logger.error("Ошибка при отправке предупреждения для попытки %d: %s", attempt.id, e) diff --git a/src/quizzi/infrastructure/utils/timezone.py b/src/quizzi/infrastructure/utils/timezone.py index 6236e2b..7201d7c 100644 --- a/src/quizzi/infrastructure/utils/timezone.py +++ b/src/quizzi/infrastructure/utils/timezone.py @@ -9,7 +9,6 @@ def now_msk() -> datetime: def now_msk_naive() -> datetime: - """Возвращает текущее время в МСК без timezone info (для сохранения в БД).""" return datetime.now(MSK_TZ).replace(tzinfo=None) From 2d4ee6c77b7ff30c1e45fe1ac377a211887db81f Mon Sep 17 00:00:00 2001 From: kolo Date: Wed, 7 Jan 2026 01:28:34 +0300 Subject: [PATCH 3/4] update --- pyproject.toml | 1 + .../application/bot/shared_dialogs/states.py | 1 + .../application/bot/shared_dialogs/tests.py | 257 ++++++++++++------ .../database/repo/test_attempt.py | 38 +++ uv.lock | 23 ++ 5 files changed, 237 insertions(+), 83 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index eb586b6..a321e6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "qrcode[pil]>=8.2", "pycryptodome>=3.23.0", "json5>=0.13.0", + "openpyxl>=3.1.0", ] [dependency-groups] diff --git a/src/quizzi/application/bot/shared_dialogs/states.py b/src/quizzi/application/bot/shared_dialogs/states.py index 3a8e41e..bfc7ac5 100644 --- a/src/quizzi/application/bot/shared_dialogs/states.py +++ b/src/quizzi/application/bot/shared_dialogs/states.py @@ -20,6 +20,7 @@ class SharedTestsSG(StatesGroup): edit_expires = State() statistics = State() attempt_detail = State() + export_select_group = State() class SharedBroadcastSG(StatesGroup): diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py index 14c7e2c..a949384 100644 --- a/src/quizzi/application/bot/shared_dialogs/tests.py +++ b/src/quizzi/application/bot/shared_dialogs/tests.py @@ -1,6 +1,6 @@ import asyncio import functools -import json +import io from datetime import date, datetime, time from aiogram import Bot @@ -11,9 +11,10 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG from aiogram_dialog.widgets.text import Const, Format from dishka import FromDishka from dishka.integrations.aiogram_dialog import inject +from openpyxl import Workbook +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG -from quizzi.domain.schemas import QuestionType from quizzi.infrastructure.database.dao.group import GroupDAO from quizzi.infrastructure.database.dao.test import TestDAO from quizzi.infrastructure.database.repo.test import TestRepository @@ -229,86 +230,8 @@ async def get_attempt_detail( return {"attempt_info": "\n".join(lines)} -@inject -async def on_export_test( - _callback: CallbackQuery, - _button: Button, - manager: DialogManager, - test_repo: FromDishka[TestRepository], -) -> None: - test_id = manager.dialog_data.get("selected_test_id") - - if not test_id: - await _callback.answer("❌ Тест не найден") - return - - assert _callback.message is not None - await _callback.answer("⏳ Экспортирую тест...") - - test, questions_with_options = await test_repo.get_full_test(test_id) - - if not test: - await _callback.message.answer("❌ Тест не найден") - return - - export_data: dict = { - "title": test.title, - "description": test.description, - "password": test.password, - "attempts": test.attempts, - "time_limit": test.time_limit, - "expires_at": test.expires_at.isoformat() if test.expires_at else None, - "for_group": test.for_group, - "questions": [], - } - - questions_list: list = export_data["questions"] - - for question, options in questions_with_options: - question_data: dict = { - "question_type": question.question_type.value, - "question": question.text, - } - - if question.question_type == QuestionType.INPUT: - correct_options = [o for o in options if o.is_correct] - if correct_options: - question_data["correct_answer"] = correct_options[0].text - else: - question_data["answers"] = [ - {"option": o.text, "is_correct": o.is_correct} - for o in options - ] - - questions_list.append(question_data) - - json_str = json.dumps(export_data, ensure_ascii=False, indent=2) - - created_str = test.created_at.strftime("%d.%m.%Y %H:%M") if test.created_at else "—" - updated_str = test.updated_at.strftime("%d.%m.%Y %H:%M") if test.updated_at else "—" - questions_count = len(questions_with_options) - - comment_header = f"""// ═══════════════════════════════════════════════════════════════ -// ЭКСПОРТ ТЕСТА: {test.title} -// ═══════════════════════════════════════════════════════════════ -// -// ❓ Вопросов: {questions_count} -// 📅 Создан: {created_str} -// 🔄 Обновлён: {updated_str} -// -// ═══════════════════════════════════════════════════════════════ - -""" - - full_content = comment_header + json_str - - safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:50] - filename = f"{safe_title}.json" - - await _callback.message.answer_document( - document=BufferedInputFile(full_content.encode("utf-8"), filename=filename), - caption=f"📤 Экспорт теста: {test.title}", - ) +async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None: + await manager.switch_to(SharedTestsSG.export_select_group) @inject @@ -564,6 +487,156 @@ async def on_back_clicked(_callback: CallbackQuery, _button: Button, manager: Di await manager.done() +@inject +async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs): + groups = await group_dao.get_all() + return { + "groups": [(f"🎓 {g.number}", str(g.number)) for g in groups], + "count": len(groups), + } + + +def create_excel_report( + test_title: str, + group_number: int, + stats: list[tuple[str, int | None, datetime | None, bool | None]], +) -> bytes: + wb = Workbook() + ws = wb.active + assert ws is not None + ws.title = "Статистика" + + header_font = Font(bold=True, color="FFFFFF") + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_alignment = Alignment(horizontal="center", vertical="center") + thin_border = Border( + left=Side(style="thin"), + right=Side(style="thin"), + top=Side(style="thin"), + bottom=Side(style="thin"), + ) + + ws.merge_cells("A1:E1") + ws["A1"] = f"Тест: {test_title}" + ws["A1"].font = Font(bold=True, size=14) + ws["A1"].alignment = Alignment(horizontal="center") + + ws.merge_cells("A2:E2") + ws["A2"] = f"Группа: {group_number}" + ws["A2"].font = Font(bold=True, size=12) + ws["A2"].alignment = Alignment(horizontal="center") + + headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"] + for col, header in enumerate(headers, 1): + cell = ws.cell(row=4, column=col, value=header) + cell.font = header_font + cell.fill = header_fill + cell.alignment = header_alignment + cell.border = thin_border + + passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") + failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") + not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") + + grades: list[int] = [] + + for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5): + ws.cell(row=row_idx, column=1, value=name).border = thin_border + + if score is not None: + ws.cell(row=row_idx, column=2, value=score).border = thin_border + + grade = score // 10 + grades.append(grade) + ws.cell(row=row_idx, column=3, value=grade).border = thin_border + + finished_msk = to_msk(finished_at) if finished_at else None + date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—" + ws.cell(row=row_idx, column=4, value=date_str).border = thin_border + status = "Пройден" if is_passed else "Не пройден" + status_cell = ws.cell(row=row_idx, column=5, value=status) + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill + else: + ws.cell(row=row_idx, column=2, value="—").border = thin_border + ws.cell(row=row_idx, column=3, value="—").border = thin_border + ws.cell(row=row_idx, column=4, value="—").border = thin_border + status_cell = ws.cell(row=row_idx, column=5, value="Не проходил") + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = not_passed_fill + + ws.column_dimensions["A"].width = 30 + ws.column_dimensions["B"].width = 15 + ws.column_dimensions["C"].width = 10 + ws.column_dimensions["D"].width = 20 + ws.column_dimensions["E"].width = 15 + + total_users = len(stats) + passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed) + attempted_users = sum(1 for _, score, _, _ in stats if score is not None) + + summary_row = len(stats) + 6 + ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True) + ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}") + ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}") + ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}") + if attempted_users > 0: + success_rate = round(passed_users / attempted_users * 100) + ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%") + if grades: + avg_grade = round(sum(grades) / len(grades), 1) + ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}") + + output = io.BytesIO() + wb.save(output) + output.seek(0) + return output.read() + + +@inject +async def on_group_selected_for_export( + _callback: CallbackQuery, + _widget: Select, + manager: DialogManager, + item_id: str, + test_dao: FromDishka[TestDAO], + attempt_repo: FromDishka[TestAttemptRepository], +) -> None: + test_id = manager.dialog_data.get("selected_test_id") + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + assert _callback.message is not None + await _callback.answer("⏳ Формирую отчёт...") + + test = await test_dao.get_by_id(test_id) + if not test: + await _callback.message.answer("❌ Тест не найден") + return + + group_number = int(item_id) + stats = await attempt_repo.get_group_test_statistics(test_id, group_number) + + if not stats: + await _callback.message.answer(f"❌ В группе {group_number} нет студентов") + return + + excel_bytes = create_excel_report(test.title, group_number, stats) + + safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30] + filename = f"{safe_title}_group_{group_number}.xlsx" + + await _callback.message.answer_document( + document=BufferedInputFile(excel_bytes, filename=filename), + caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}", + ) + + shared_tests_dialog = Dialog( Window( Format("📝 Тесты\n\nВсего: {count}"), @@ -601,7 +674,7 @@ shared_tests_dialog = Dialog( ), Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics), Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test), - Button(Const("📤 Экспорт"), id="export", on_click=on_export_test), + Button(Const("📥 Экспорт"), id="export", on_click=on_export_stats), Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list), ), @@ -704,4 +777,22 @@ shared_tests_dialog = Dialog( state=SharedTestsSG.attempt_detail, getter=get_attempt_detail, ), + Window( + Format("📥 Экспорт статистики\n\nВыберите группу для экспорта:\n\nВсего групп: {count}"), + ScrollingGroup( + Select( + Format("{item[0]}"), + id="export_group_select", + item_id_getter=lambda x: x[1], + items="groups", + on_click=on_group_selected_for_export, + ), + id="export_groups_scroll", + width=2, + height=7, + ), + Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail), + state=SharedTestsSG.export_select_group, + getter=get_groups_for_export, + ), ) diff --git a/src/quizzi/infrastructure/database/repo/test_attempt.py b/src/quizzi/infrastructure/database/repo/test_attempt.py index a7e44d5..524a9ea 100644 --- a/src/quizzi/infrastructure/database/repo/test_attempt.py +++ b/src/quizzi/infrastructure/database/repo/test_attempt.py @@ -284,3 +284,41 @@ class TestAttemptRepository: async def mark_warning_sent(self, attempt_id: int, sent_at: datetime) -> None: await self.attempt_dao.update(attempt_id=attempt_id, warning_sent_at=sent_at) + + async def get_group_test_statistics( + self, test_id: int, group_number: int + ) -> list[tuple[str, int | None, datetime | None, bool | None]]: + result = await self.session.execute( + select( + UserModel.name, + UserModel.first_name, + TestAttemptModel.score, + TestAttemptModel.finished_at, + TestAttemptModel.is_passed, + ) + .select_from(UserModel) + .outerjoin( + TestAttemptModel, + (TestAttemptModel.user_id == UserModel.id) & + (TestAttemptModel.test_id == test_id) & + (TestAttemptModel.finished_at.isnot(None)) + ) + .where(UserModel.group == group_number) + .order_by(UserModel.name, UserModel.first_name, TestAttemptModel.finished_at.desc()) + ) + rows = result.all() + + user_best: dict[str, tuple[int | None, datetime | None, bool | None]] = {} + for name, first_name, score, finished_at, is_passed in rows: + display_name = name or first_name + if display_name not in user_best: + user_best[display_name] = (score, finished_at, is_passed) + elif score is not None: + current_score = user_best[display_name][0] + if current_score is None or score > current_score: + user_best[display_name] = (score, finished_at, is_passed) + + return [ + (name, score, finished_at, is_passed) + for name, (score, finished_at, is_passed) in sorted(user_best.items()) + ] diff --git a/uv.lock b/uv.lock index 14ba0c3..3b077c8 100644 --- a/uv.lock +++ b/uv.lock @@ -232,6 +232,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/b9/89381173b4f336e986d72471198614806cd313e0f85c143ccb677c310223/dishka-1.7.2-py3-none-any.whl", hash = "sha256:f6faa6ab321903926b825b3337d77172ee693450279b314434864978d01fbad3", size = 94774, upload-time = "2025-09-24T21:23:03.246Z" }, ] +[[package]] +name = "et-xmlfile" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" }, +] + [[package]] name = "frozenlist" version = "1.8.0" @@ -514,6 +523,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/da/7d22601b625e241d4f23ef1ebff8acfc60da633c9e7e7922e24d10f592b3/multidict-6.7.0-py3-none-any.whl", hash = "sha256:394fc5c42a333c9ffc3e421a4c85e08580d990e08b99f6bf35b4132114c5dcb3", size = 12317, upload-time = "2025-10-06T14:52:29.272Z" }, ] +[[package]] +name = "openpyxl" +version = "3.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "et-xmlfile" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" }, +] + [[package]] name = "pillow" version = "12.1.0" @@ -730,6 +751,7 @@ dependencies = [ { name = "dishka" }, { name = "httpx" }, { name = "json5" }, + { name = "openpyxl" }, { name = "pycryptodome" }, { name = "pydantic" }, { name = "qrcode", extra = ["pil"] }, @@ -753,6 +775,7 @@ requires-dist = [ { name = "dishka", specifier = ">=1.7.2" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "json5", specifier = ">=0.13.0" }, + { name = "openpyxl", specifier = ">=3.1.0" }, { name = "pycryptodome", specifier = ">=3.23.0" }, { name = "pydantic", specifier = ">=2.10.5" }, { name = "qrcode", extras = ["pil"], specifier = ">=8.2" }, From 7cc194abe576c0c3f295c591c3f1773aa853ad42 Mon Sep 17 00:00:00 2001 From: kolo Date: Wed, 7 Jan 2026 10:58:25 +0300 Subject: [PATCH 4/4] update --- .../application/bot/shared_dialogs/states.py | 1 + .../application/bot/shared_dialogs/tests.py | 39 +++++++++++++++++++ .../infrastructure/database/dao/test.py | 2 +- src/quizzi/infrastructure/database/models.py | 7 +++- 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/quizzi/application/bot/shared_dialogs/states.py b/src/quizzi/application/bot/shared_dialogs/states.py index bfc7ac5..50041a2 100644 --- a/src/quizzi/application/bot/shared_dialogs/states.py +++ b/src/quizzi/application/bot/shared_dialogs/states.py @@ -21,6 +21,7 @@ class SharedTestsSG(StatesGroup): statistics = State() attempt_detail = State() export_select_group = State() + delete_confirm = State() class SharedBroadcastSG(StatesGroup): diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py index a949384..12d2c97 100644 --- a/src/quizzi/application/bot/shared_dialogs/tests.py +++ b/src/quizzi/application/bot/shared_dialogs/tests.py @@ -299,6 +299,35 @@ async def on_edit_expires(_callback: CallbackQuery, _button: Button, manager: Di await manager.switch_to(SharedTestsSG.edit_expires) +async def on_delete_test(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.switch_to(SharedTestsSG.delete_confirm) + + +@inject +async def get_delete_confirm_data(dialog_manager: DialogManager, test_dao: FromDishka[TestDAO], **_kwargs): + test_id = dialog_manager.dialog_data.get("selected_test_id") + if not test_id: + return {"test_title": "Неизвестный тест"} + + test = await test_dao.get_by_id(test_id) + return {"test_title": test.title if test else "Неизвестный тест"} + + +@inject +async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): + test_id = manager.dialog_data.get("selected_test_id") + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + deleted = await test_dao.delete(test_id) + if deleted: + await _callback.answer("✅ Тест удалён") + await manager.switch_to(SharedTestsSG.tests_list) + else: + await _callback.answer("❌ Не удалось удалить тест") + + @inject async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): test_id = manager.dialog_data.get("selected_test_id") @@ -689,6 +718,7 @@ shared_tests_dialog = Dialog( Button(Const("⏱️ Лимит времени"), id="edit_time_limit", on_click=on_edit_time_limit), Button(Const("👥 Группа"), id="edit_group", on_click=on_edit_group), Button(Const("📅 Срок действия"), id="edit_expires", on_click=on_edit_expires), + Button(Const("🗑 Удалить тест"), id="delete_test", on_click=on_delete_test), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail), ), state=SharedTestsSG.edit_menu, @@ -795,4 +825,13 @@ shared_tests_dialog = Dialog( state=SharedTestsSG.export_select_group, getter=get_groups_for_export, ), + Window( + Format("🗑 Удаление теста\n\n⚠️ Вы уверены, что хотите удалить тест {test_title}?\n\nБудут удалены все вопросы, варианты ответов и результаты прохождений."), + Row( + Button(Const("✅ Да, удалить"), id="confirm_delete", on_click=on_confirm_delete), + Button(Const("❌ Отмена"), id="cancel_delete", on_click=on_back_to_edit_menu), + ), + state=SharedTestsSG.delete_confirm, + getter=get_delete_confirm_data, + ), ) diff --git a/src/quizzi/infrastructure/database/dao/test.py b/src/quizzi/infrastructure/database/dao/test.py index 1fbccf5..44a4069 100644 --- a/src/quizzi/infrastructure/database/dao/test.py +++ b/src/quizzi/infrastructure/database/dao/test.py @@ -41,7 +41,7 @@ class TestDAO: async def get_all(self) -> list[DomainTest]: result = await self.session.execute( - select(Test).order_by(Test.created_at.desc()) + select(Test).order_by(Test.is_active.desc(), Test.created_at.desc()) ) models = list(result.scalars().all()) return [TestDTO(model).to_domain() for model in models] diff --git a/src/quizzi/infrastructure/database/models.py b/src/quizzi/infrastructure/database/models.py index 897905f..68da7c3 100644 --- a/src/quizzi/infrastructure/database/models.py +++ b/src/quizzi/infrastructure/database/models.py @@ -63,6 +63,11 @@ class Test(Base): cascade="all, delete-orphan", order_by="Question.position" ) + + test_attempts: Mapped[list["TestAttempt"]] = relationship( + back_populates="test", + cascade="all, delete-orphan", + ) @final @@ -110,7 +115,7 @@ class TestAttempt(Base): is_passed: Mapped[bool] = mapped_column(default=False) user: Mapped["User"] = relationship() - test: Mapped["Test"] = relationship() + test: Mapped["Test"] = relationship(back_populates="test_attempts") answers: Mapped[list["UserAnswer"]] = relationship( back_populates="attempt", cascade="all, delete-orphan"