From a75d017aa753ed3ba1f560d8b46bbc7c32061127 Mon Sep 17 00:00:00 2001 From: kolo Date: Wed, 7 Jan 2026 22:37:54 +0300 Subject: [PATCH] update --- src/quizzi/application/__main__.py | 3 +- src/quizzi/application/bot/handlers.py | 86 ++--- .../bot/shared_dialogs/broadcast.py | 12 +- .../application/bot/shared_dialogs/tests.py | 335 +++++++----------- src/quizzi/infrastructure/di.py | 40 +++ src/quizzi/service/__init__.py | 1 + src/quizzi/service/broadcast.py | 43 +++ src/quizzi/service/excel.py | 145 ++++++++ src/quizzi/service/test.py | 184 ++++++++++ src/quizzi/service/test_attempt.py | 204 +++++++++++ src/quizzi/service/user.py | 75 ++++ 11 files changed, 854 insertions(+), 274 deletions(-) create mode 100644 src/quizzi/service/__init__.py create mode 100644 src/quizzi/service/broadcast.py create mode 100644 src/quizzi/service/excel.py create mode 100644 src/quizzi/service/test.py create mode 100644 src/quizzi/service/test_attempt.py create mode 100644 src/quizzi/service/user.py diff --git a/src/quizzi/application/__main__.py b/src/quizzi/application/__main__.py index 52df068..1b810b6 100644 --- a/src/quizzi/application/__main__.py +++ b/src/quizzi/application/__main__.py @@ -26,7 +26,7 @@ from quizzi.application.bot.user_dialogs.main_menu import user_menu_dialog from quizzi.application.bot.user_dialogs.registration import registration_dialog from quizzi.application.bot.user_dialogs.take_test import take_test_dialog from quizzi.infrastructure.database.repo.user import UserRepository -from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider +from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider, ServiceProvider from quizzi.infrastructure.utils.bot_commands import setup_bot_commands from quizzi.infrastructure.utils.config import Config @@ -68,6 +68,7 @@ async def main() -> None: container = make_async_container( DatabaseProvider(), + ServiceProvider(), SchedulerProvider(), context={Bot: bot, Config: config} ) diff --git a/src/quizzi/application/bot/handlers.py b/src/quizzi/application/bot/handlers.py index 4dbff34..dfee906 100644 --- a/src/quizzi/application/bot/handlers.py +++ b/src/quizzi/application/bot/handlers.py @@ -10,36 +10,29 @@ from dishka.integrations.aiogram import FromDishka from quizzi.application.bot.admin_dialogs.states import AdminMenuSG from quizzi.application.bot.creator_dialogs.states import CreatorMenuSG from quizzi.application.bot.user_dialogs.states import UserDeeplinkSG, UserMenuSG, UserRegistrationSG -from quizzi.infrastructure.database.dao.group import GroupDAO -from quizzi.infrastructure.database.dao.test import TestDAO -from quizzi.infrastructure.database.dao.user import UserDAO -from quizzi.infrastructure.utils.config import Config -from quizzi.infrastructure.utils.test_id_to_hash import decode_id -from quizzi.infrastructure.utils.timezone import now_msk_naive +from quizzi.service.test import TestService +from quizzi.service.user import UserService router = Router() logger = logging.getLogger(__name__) async def ensure_user_registered( - user_dao: UserDAO, - group_dao: GroupDAO, + user_service: UserService, message: Message, dialog_manager: DialogManager, pending_test_id: int | None = None, ) -> bool: assert message.from_user is not None - existing_user = await user_dao.get_by_id(message.from_user.id) - groups = await group_dao.get_all() - has_groups = len(groups) > 0 + result = await user_service.check_registration(message.from_user.id) - start_data = {"user_id": message.from_user.id, "has_groups": has_groups} + start_data = {"user_id": message.from_user.id, "has_groups": result.has_groups} if pending_test_id: start_data["pending_test_id"] = pending_test_id - if existing_user is None: - await user_dao.create( + if result.user is None: + await user_service.create_user( user_id=message.from_user.id, first_name=message.from_user.first_name, username=message.from_user.username, @@ -52,10 +45,7 @@ async def ensure_user_registered( ) return False - needs_name = existing_user.name is None - needs_group = has_groups and existing_user.group is None - - if needs_name: + if result.needs_name: await dialog_manager.start( UserRegistrationSG.input_name, mode=StartMode.RESET_STACK, @@ -63,7 +53,7 @@ async def ensure_user_registered( ) return False - if needs_group: + if result.needs_group: await dialog_manager.start( UserRegistrationSG.select_group, mode=StartMode.RESET_STACK, @@ -71,7 +61,7 @@ async def ensure_user_registered( ) return False - await user_dao.upsert( + await user_service.update_user_info( user_id=message.from_user.id, first_name=message.from_user.first_name, username=message.from_user.username, @@ -80,39 +70,13 @@ async def ensure_user_registered( return True -async def validate_deeplink_test( - test_dao: TestDAO, - user_dao: UserDAO, - test_id: int, - user_id: int, -) -> tuple[bool, str]: - test = await test_dao.get_by_id(test_id) - - if not test: - return False, "❌ Тест не найден" - - if not test.is_active: - return False, "❌ Тест деактивирован" - - if test.expires_at and test.expires_at < now_msk_naive(): - return False, "❌ Срок действия теста истек" - - user = await user_dao.get_by_id(user_id) - if test.for_group and user and user.group != test.for_group: - return False, f"❌ Тест доступен только для группы {test.for_group}" - - return True, "" - - @router.message(CommandStart(deep_link=True)) async def start_with_deeplink( message: Message, command: CommandObject, dialog_manager: DialogManager, - user_dao: FromDishka[UserDAO], - group_dao: FromDishka[GroupDAO], - test_dao: FromDishka[TestDAO], - config: FromDishka[Config], + user_service: FromDishka[UserService], + test_service: FromDishka[TestService], ) -> None: assert message.from_user is not None @@ -125,39 +89,36 @@ async def start_with_deeplink( ) if not deeplink: - await start_handler(message, user_dao, group_dao, dialog_manager) + await start_handler(message, user_service, dialog_manager) return - try: - test_id = decode_id(deeplink, config.security.encode_key) - except (ValueError, IndexError): + test_id = test_service.decode_test_hash(deeplink) + if test_id is None: logger.warning("Invalid deeplink: user_id=%d, deeplink=%s", message.from_user.id, deeplink) await message.answer("❌ Неверная ссылка на тест") - await start_handler(message, user_dao, group_dao, dialog_manager) + await start_handler(message, user_service, dialog_manager) return is_registered = await ensure_user_registered( - user_dao, group_dao, message, dialog_manager, pending_test_id=test_id + user_service, message, dialog_manager, pending_test_id=test_id ) if not is_registered: return - is_valid, error = await validate_deeplink_test( - test_dao, user_dao, test_id, message.from_user.id - ) + validation = await test_service.validate_test(test_id, message.from_user.id) - if not is_valid: + if not validation.is_valid: logger.info( "Test validation failed: user_id=%d, test_id=%d, error=%s", message.from_user.id, test_id, - error, + validation.error, ) await dialog_manager.start( UserDeeplinkSG.test_preview, mode=StartMode.RESET_STACK, - data={"test_id": test_id, "error": error} + data={"test_id": test_id, "error": validation.error} ) return @@ -173,8 +134,7 @@ async def start_with_deeplink( async def start_handler( message: Message, dialog_manager: DialogManager, - user_dao: FromDishka[UserDAO], - group_dao: FromDishka[GroupDAO], + user_service: FromDishka[UserService], ) -> None: assert message.from_user is not None logger.info( @@ -184,7 +144,7 @@ async def start_handler( ) is_registered = await ensure_user_registered( - user_dao, group_dao, message, dialog_manager + user_service, message, dialog_manager ) if is_registered: diff --git a/src/quizzi/application/bot/shared_dialogs/broadcast.py b/src/quizzi/application/bot/shared_dialogs/broadcast.py index 17006f0..ff4fb4b 100644 --- a/src/quizzi/application/bot/shared_dialogs/broadcast.py +++ b/src/quizzi/application/bot/shared_dialogs/broadcast.py @@ -7,8 +7,7 @@ from dishka import FromDishka from dishka.integrations.aiogram_dialog import inject from quizzi.application.bot.shared_dialogs.states import SharedBroadcastSG -from quizzi.infrastructure.database.dao.user import UserDAO -from quizzi.infrastructure.utils.broadcast import broadcast_message +from quizzi.service.broadcast import BroadcastService async def on_broadcast_input(message: Message, _widget: MessageInput, manager: DialogManager): @@ -18,7 +17,12 @@ async def on_broadcast_input(message: Message, _widget: MessageInput, manager: D @inject -async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manager: DialogManager, user_dao: FromDishka[UserDAO]): +async def on_broadcast_confirm( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + broadcast_service: FromDishka[BroadcastService], +): message_id = manager.dialog_data.get("broadcast_message_id") chat_id = manager.dialog_data.get("broadcast_chat_id") @@ -33,7 +37,7 @@ async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manage await _callback.answer("Ошибка: бот не найден") return - stats = await broadcast_message(bot, message_id, chat_id, user_dao) + stats = await broadcast_service.broadcast_message(bot, message_id, chat_id) stats_text = ( f"✅ Рассылка завершена\n\n" diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py index 12d2c97..288adef 100644 --- a/src/quizzi/application/bot/shared_dialogs/tests.py +++ b/src/quizzi/application/bot/shared_dialogs/tests.py @@ -1,6 +1,5 @@ import asyncio import functools -import io from datetime import date, datetime, time from aiogram import Bot @@ -11,18 +10,16 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG from aiogram_dialog.widgets.text import Const, Format from dishka import FromDishka from dishka.integrations.aiogram_dialog import inject -from openpyxl import Workbook -from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG from quizzi.infrastructure.database.dao.group import GroupDAO from quizzi.infrastructure.database.dao.test import TestDAO from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository -from quizzi.infrastructure.utils.config import Config from quizzi.infrastructure.utils.qr_generator import generate_qr_bytes -from quizzi.infrastructure.utils.test_id_to_hash import encode_id from quizzi.infrastructure.utils.timezone import to_msk +from quizzi.service.excel import ExcelService +from quizzi.service.test import TestService @inject @@ -101,34 +98,38 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T @inject -async def on_toggle_active(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_toggle_active( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - test = await test_dao.get_by_id(test_id) - - if test: - await test_dao.update(test_id, is_active=not test.is_active) - action = "деактивирован" if test.is_active else "активирован" - await _callback.answer(f"✅ Тест {action}") + result = await test_service.toggle_test_active(test_id) + await _callback.answer(result.message) + if result.success: await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_toggle_results_viewable(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_toggle_results_viewable( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - test = await test_dao.get_by_id(test_id) - - if test: - await test_dao.update(test_id, are_results_viewable=not test.are_results_viewable) - action = "скрыты" if test.are_results_viewable else "видны" - await _callback.answer(f"✅ Результаты теперь {action}") + result = await test_service.toggle_results_viewable(test_id) + await _callback.answer(result.message) + if result.success: await manager.switch_to(SharedTestsSG.test_detail) @@ -235,7 +236,13 @@ async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: Di @inject -async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager, config: FromDishka[Config], bot_inst: FromDishka[Bot]): +async def on_share_test( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], + bot_inst: FromDishka[Bot], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: @@ -243,11 +250,7 @@ async def on_share_test(_callback: CallbackQuery, _button: Button, manager: Dial "share_link": "Ошибка: тест не найден" } - test_hash = encode_id( - test_id, - config.security.encode_key, - config.security.encoded_string_length - ) + test_hash = test_service.encode_test_id(test_id) bot_info = await bot_inst.get_me() bot_username = bot_info.username or "your_bot" @@ -314,13 +317,18 @@ async def get_delete_confirm_data(dialog_manager: DialogManager, test_dao: FromD @inject -async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_confirm_delete( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - deleted = await test_dao.delete(test_id) + deleted = await test_service.delete_test(test_id) if deleted: await _callback.answer("✅ Тест удалён") await manager.switch_to(SharedTestsSG.tests_list) @@ -329,7 +337,12 @@ async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: @inject -async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_password_input( + message: Message, + _widget: MessageInput, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await message.answer("❌ Тест не найден") @@ -339,30 +352,36 @@ async def on_password_input(message: Message, _widget: MessageInput, manager: Di await message.answer("❌ Пароль не может быть пустым") return - password = message.text.strip() - if len(password) > 255: - await message.answer("❌ Пароль слишком длинный (максимум 255 символов)") - return - - await test_dao.update(test_id, password=password) - await message.answer("✅ Пароль обновлен") - await manager.switch_to(SharedTestsSG.test_detail) + result = await test_service.update_password(test_id, message.text.strip()) + await message.answer(result.message) + if result.success: + await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_remove_password(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_remove_password( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, password=None) - await _callback.answer("✅ Пароль удален") + result = await test_service.remove_password(test_id) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_attempts_input_edit(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_attempts_input_edit( + message: Message, + _widget: MessageInput, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await message.answer("❌ Тест не найден") @@ -373,40 +392,40 @@ async def on_attempts_input_edit(message: Message, _widget: MessageInput, manage return attempts_str = message.text.strip() - if not attempts_str.isdigit(): await message.answer("❌ Количество попыток должно быть числом") return - attempts = int(attempts_str) - - if attempts < 1: - await message.answer("❌ Количество попыток должно быть больше 0") - return - - if attempts > 100: - await message.answer("❌ Количество попыток не может быть больше 100") - return - - await test_dao.update(test_id, attempts=attempts) - await message.answer("✅ Количество попыток обновлено") - await manager.switch_to(SharedTestsSG.test_detail) + result = await test_service.update_attempts(test_id, int(attempts_str)) + await message.answer(result.message) + if result.success: + await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_remove_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_remove_attempts( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, attempts=None) - await _callback.answer("✅ Ограничение попыток удалено") + result = await test_service.remove_attempts(test_id) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_time_limit_input( + message: Message, + _widget: MessageInput, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await message.answer("❌ Тест не найден") @@ -417,36 +436,30 @@ async def on_time_limit_input(message: Message, _widget: MessageInput, manager: return time_limit_str = message.text.strip() - if not time_limit_str.isdigit(): await message.answer("❌ Лимит времени должен быть числом (в минутах)") return - time_limit_minutes = int(time_limit_str) - - if time_limit_minutes < 1: - await message.answer("❌ Лимит времени должен быть больше 0") - return - - if time_limit_minutes > 1440: - await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)") - return - - time_limit_seconds = time_limit_minutes * 60 - await test_dao.update(test_id, time_limit=time_limit_seconds) - await message.answer("✅ Лимит времени обновлен") - await manager.switch_to(SharedTestsSG.test_detail) + result = await test_service.update_time_limit(test_id, int(time_limit_str)) + await message.answer(result.message) + if result.success: + await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_remove_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_remove_time_limit( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, time_limit=None) - await _callback.answer("✅ Лимит времени удален") + result = await test_service.remove_time_limit(test_id) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @@ -460,51 +473,73 @@ async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDish @inject -async def on_group_selected_for_test(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str, test_dao: FromDishka[TestDAO]): +async def on_group_selected_for_test( + _callback: CallbackQuery, + _widget, + manager: DialogManager, + item_id: str, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, for_group=int(item_id)) - await _callback.answer("✅ Группа обновлена") + result = await test_service.update_group(test_id, int(item_id)) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_remove_group(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_remove_group( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, for_group=None) - await _callback.answer("✅ Тест теперь доступен для всех групп") + result = await test_service.remove_group(test_id) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_date_selected_for_test(_callback, _widget, manager: DialogManager, selected_date: date, test_dao: FromDishka[TestDAO]): +async def on_date_selected_for_test( + _callback, + _widget, + manager: DialogManager, + selected_date: date, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return expires_at = datetime.combine(selected_date, time.min) - await test_dao.update(test_id, expires_at=expires_at) - await _callback.answer("✅ Срок действия обновлен") + result = await test_service.update_expires(test_id, expires_at) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @inject -async def on_remove_expires(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]): +async def on_remove_expires( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_service: FromDishka[TestService], +): test_id = manager.dialog_data.get("selected_test_id") if not test_id: await _callback.answer("❌ Тест не найден") return - await test_dao.update(test_id, expires_at=None) - await _callback.answer("✅ Срок действия удален") + result = await test_service.remove_expires(test_id) + await _callback.answer(result.message) await manager.switch_to(SharedTestsSG.test_detail) @@ -525,115 +560,13 @@ async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDi } -def create_excel_report( - test_title: str, - group_number: int, - stats: list[tuple[str, int | None, datetime | None, bool | None]], -) -> bytes: - wb = Workbook() - ws = wb.active - assert ws is not None - ws.title = "Статистика" - - header_font = Font(bold=True, color="FFFFFF") - header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") - header_alignment = Alignment(horizontal="center", vertical="center") - thin_border = Border( - left=Side(style="thin"), - right=Side(style="thin"), - top=Side(style="thin"), - bottom=Side(style="thin"), - ) - - ws.merge_cells("A1:E1") - ws["A1"] = f"Тест: {test_title}" - ws["A1"].font = Font(bold=True, size=14) - ws["A1"].alignment = Alignment(horizontal="center") - - ws.merge_cells("A2:E2") - ws["A2"] = f"Группа: {group_number}" - ws["A2"].font = Font(bold=True, size=12) - ws["A2"].alignment = Alignment(horizontal="center") - - headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"] - for col, header in enumerate(headers, 1): - cell = ws.cell(row=4, column=col, value=header) - cell.font = header_font - cell.fill = header_fill - cell.alignment = header_alignment - cell.border = thin_border - - passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") - failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") - not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") - - grades: list[int] = [] - - for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5): - ws.cell(row=row_idx, column=1, value=name).border = thin_border - - if score is not None: - ws.cell(row=row_idx, column=2, value=score).border = thin_border - - grade = score // 10 - grades.append(grade) - ws.cell(row=row_idx, column=3, value=grade).border = thin_border - - finished_msk = to_msk(finished_at) if finished_at else None - date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—" - ws.cell(row=row_idx, column=4, value=date_str).border = thin_border - status = "Пройден" if is_passed else "Не пройден" - status_cell = ws.cell(row=row_idx, column=5, value=status) - status_cell.border = thin_border - - for col in range(1, 6): - ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill - else: - ws.cell(row=row_idx, column=2, value="—").border = thin_border - ws.cell(row=row_idx, column=3, value="—").border = thin_border - ws.cell(row=row_idx, column=4, value="—").border = thin_border - status_cell = ws.cell(row=row_idx, column=5, value="Не проходил") - status_cell.border = thin_border - - for col in range(1, 6): - ws.cell(row=row_idx, column=col).fill = not_passed_fill - - ws.column_dimensions["A"].width = 30 - ws.column_dimensions["B"].width = 15 - ws.column_dimensions["C"].width = 10 - ws.column_dimensions["D"].width = 20 - ws.column_dimensions["E"].width = 15 - - total_users = len(stats) - passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed) - attempted_users = sum(1 for _, score, _, _ in stats if score is not None) - - summary_row = len(stats) + 6 - ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True) - ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}") - ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}") - ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}") - if attempted_users > 0: - success_rate = round(passed_users / attempted_users * 100) - ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%") - if grades: - avg_grade = round(sum(grades) / len(grades), 1) - ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}") - - output = io.BytesIO() - wb.save(output) - output.seek(0) - return output.read() - - @inject async def on_group_selected_for_export( _callback: CallbackQuery, _widget: Select, manager: DialogManager, item_id: str, - test_dao: FromDishka[TestDAO], - attempt_repo: FromDishka[TestAttemptRepository], + excel_service: FromDishka[ExcelService], ) -> None: test_id = manager.dialog_data.get("selected_test_id") if not test_id: @@ -643,26 +576,16 @@ async def on_group_selected_for_export( assert _callback.message is not None await _callback.answer("⏳ Формирую отчёт...") - test = await test_dao.get_by_id(test_id) - if not test: - await _callback.message.answer("❌ Тест не найден") - return - group_number = int(item_id) - stats = await attempt_repo.get_group_test_statistics(test_id, group_number) + result = await excel_service.generate_group_report(test_id, group_number) - if not stats: - await _callback.message.answer(f"❌ В группе {group_number} нет студентов") + if not result.success or not result.data or not result.filename: + await _callback.message.answer(result.caption) return - excel_bytes = create_excel_report(test.title, group_number, stats) - - safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30] - filename = f"{safe_title}_group_{group_number}.xlsx" - await _callback.message.answer_document( - document=BufferedInputFile(excel_bytes, filename=filename), - caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}", + document=BufferedInputFile(result.data, filename=result.filename), + caption=result.caption, ) diff --git a/src/quizzi/infrastructure/di.py b/src/quizzi/infrastructure/di.py index 3d6d39a..35e4386 100644 --- a/src/quizzi/infrastructure/di.py +++ b/src/quizzi/infrastructure/di.py @@ -20,6 +20,11 @@ from quizzi.infrastructure.database.repo.user import UserRepository from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications from quizzi.infrastructure.utils.config import Config from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter +from quizzi.service.broadcast import BroadcastService +from quizzi.service.excel import ExcelService +from quizzi.service.test import TestService +from quizzi.service.test_attempt import TestAttemptService +from quizzi.service.user import UserService class DatabaseProvider(Provider): @@ -80,6 +85,41 @@ class DatabaseProvider(Provider): return TestAttemptRepository(session) +class ServiceProvider(Provider): + @provide(scope=Scope.REQUEST) + def get_user_service(self, user_dao: UserDAO, group_dao: GroupDAO) -> UserService: + return UserService(user_dao, group_dao) + + @provide(scope=Scope.REQUEST) + def get_test_service( + self, + test_dao: TestDAO, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + user_dao: UserDAO, + config: Config, + ) -> TestService: + return TestService(test_dao, test_repo, attempt_repo, user_dao, config) + + @provide(scope=Scope.REQUEST) + def get_test_attempt_service( + self, + test_dao: TestDAO, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + answer_dao: UserAnswerDAO, + ) -> TestAttemptService: + return TestAttemptService(test_dao, test_repo, attempt_repo, answer_dao) + + @provide(scope=Scope.REQUEST) + def get_broadcast_service(self, user_dao: UserDAO) -> BroadcastService: + return BroadcastService(user_dao) + + @provide(scope=Scope.REQUEST) + def get_excel_service(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> ExcelService: + return ExcelService(test_dao, attempt_repo) + + class SchedulerProvider(Provider): @provide(scope = Scope.APP) def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler: diff --git a/src/quizzi/service/__init__.py b/src/quizzi/service/__init__.py new file mode 100644 index 0000000..7721cac --- /dev/null +++ b/src/quizzi/service/__init__.py @@ -0,0 +1 @@ +# Service layer - бизнес-логика приложения diff --git a/src/quizzi/service/broadcast.py b/src/quizzi/service/broadcast.py new file mode 100644 index 0000000..ab8bcf2 --- /dev/null +++ b/src/quizzi/service/broadcast.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass + +from aiogram import Bot +from aiogram.exceptions import TelegramAPIError + +from quizzi.infrastructure.database.dao.user import UserDAO + + +@dataclass +class BroadcastStats: + total: int + success: int + failed: int + + +class BroadcastService: + def __init__(self, user_dao: UserDAO) -> None: + self._user_dao = user_dao + + async def broadcast_message( + self, + bot: Bot, + message_id: int, + from_chat_id: int, + ) -> BroadcastStats: + users = await self._user_dao.get_all() + + total = len(users) + success = 0 + failed = 0 + + for user in users: + try: + await bot.copy_message( + chat_id=user.id, + from_chat_id=from_chat_id, + message_id=message_id, + ) + success += 1 + except TelegramAPIError: + failed += 1 + + return BroadcastStats(total=total, success=success, failed=failed) diff --git a/src/quizzi/service/excel.py b/src/quizzi/service/excel.py new file mode 100644 index 0000000..64ca8be --- /dev/null +++ b/src/quizzi/service/excel.py @@ -0,0 +1,145 @@ +import io +from dataclasses import dataclass +from datetime import datetime + +from openpyxl import Workbook +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side + +from quizzi.infrastructure.database.dao.test import TestDAO +from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository +from quizzi.infrastructure.utils.timezone import to_msk + + +@dataclass +class ExcelReportResult: + success: bool + data: bytes | None = None + filename: str | None = None + caption: str = "" + + +class ExcelService: + def __init__(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> None: + self._test_dao = test_dao + self._attempt_repo = attempt_repo + + def create_test_report( + self, + test_title: str, + group_number: int, + stats: list[tuple[str, int | None, datetime | None, bool | None]], + ) -> bytes: + wb = Workbook() + ws = wb.active + assert ws is not None + ws.title = "Статистика" + + header_font = Font(bold=True, color="FFFFFF") + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_alignment = Alignment(horizontal="center", vertical="center") + thin_border = Border( + left=Side(style="thin"), + right=Side(style="thin"), + top=Side(style="thin"), + bottom=Side(style="thin"), + ) + + ws.merge_cells("A1:E1") + ws["A1"] = f"Тест: {test_title}" + ws["A1"].font = Font(bold=True, size=14) + ws["A1"].alignment = Alignment(horizontal="center") + + ws.merge_cells("A2:E2") + ws["A2"] = f"Группа: {group_number}" + ws["A2"].font = Font(bold=True, size=12) + ws["A2"].alignment = Alignment(horizontal="center") + + headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"] + for col, header in enumerate(headers, 1): + cell = ws.cell(row=4, column=col, value=header) + cell.font = header_font + cell.fill = header_fill + cell.alignment = header_alignment + cell.border = thin_border + + passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") + failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") + not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") + + grades: list[int] = [] + + for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5): + ws.cell(row=row_idx, column=1, value=name).border = thin_border + + if score is not None: + ws.cell(row=row_idx, column=2, value=score).border = thin_border + + grade = score // 10 + grades.append(grade) + ws.cell(row=row_idx, column=3, value=grade).border = thin_border + + finished_msk = to_msk(finished_at) if finished_at else None + date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—" + ws.cell(row=row_idx, column=4, value=date_str).border = thin_border + status = "Пройден" if is_passed else "Не пройден" + status_cell = ws.cell(row=row_idx, column=5, value=status) + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill + else: + ws.cell(row=row_idx, column=2, value="—").border = thin_border + ws.cell(row=row_idx, column=3, value="—").border = thin_border + ws.cell(row=row_idx, column=4, value="—").border = thin_border + status_cell = ws.cell(row=row_idx, column=5, value="Не проходил") + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = not_passed_fill + + ws.column_dimensions["A"].width = 30 + ws.column_dimensions["B"].width = 15 + ws.column_dimensions["C"].width = 10 + ws.column_dimensions["D"].width = 20 + ws.column_dimensions["E"].width = 15 + + total_users = len(stats) + passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed) + attempted_users = sum(1 for _, score, _, _ in stats if score is not None) + + summary_row = len(stats) + 6 + ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True) + ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}") + ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}") + ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}") + if attempted_users > 0: + success_rate = round(passed_users / attempted_users * 100) + ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%") + if grades: + avg_grade = round(sum(grades) / len(grades), 1) + ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}") + + output = io.BytesIO() + wb.save(output) + output.seek(0) + return output.read() + + async def generate_group_report(self, test_id: int, group_number: int) -> ExcelReportResult: + test = await self._test_dao.get_by_id(test_id) + if not test: + return ExcelReportResult(success=False, caption="❌ Тест не найден") + + stats = await self._attempt_repo.get_group_test_statistics(test_id, group_number) + if not stats: + return ExcelReportResult(success=False, caption=f"❌ В группе {group_number} нет студентов") + + excel_bytes = self.create_test_report(test.title, group_number, stats) + safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30] + filename = f"{safe_title}_group_{group_number}.xlsx" + + return ExcelReportResult( + success=True, + data=excel_bytes, + filename=filename, + caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}", + ) diff --git a/src/quizzi/service/test.py b/src/quizzi/service/test.py new file mode 100644 index 0000000..8128f8a --- /dev/null +++ b/src/quizzi/service/test.py @@ -0,0 +1,184 @@ +from dataclasses import dataclass +from datetime import datetime + +from quizzi.domain.schemas import Test +from quizzi.infrastructure.database.dao.test import TestDAO +from quizzi.infrastructure.database.dao.user import UserDAO +from quizzi.infrastructure.database.repo.test import TestRepository +from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository +from quizzi.infrastructure.utils.config import Config +from quizzi.infrastructure.utils.test_id_to_hash import decode_id, encode_id +from quizzi.infrastructure.utils.timezone import now_msk_naive + + +@dataclass +class TestValidationResult: + is_valid: bool + error: str = "" + test: Test | None = None + + +@dataclass +class TestAccessResult: + can_access: bool + error: str = "" + remaining_attempts: int | None = None + + +@dataclass +class TestUpdateResult: + success: bool + message: str + + +class TestService: + def __init__( + self, + test_dao: TestDAO, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + user_dao: UserDAO, + config: Config, + ) -> None: + self._test_dao = test_dao + self._test_repo = test_repo + self._attempt_repo = attempt_repo + self._user_dao = user_dao + self._config = config + + def decode_test_hash(self, test_hash: str) -> int | None: + try: + return decode_id(test_hash, self._config.security.encode_key) + except (ValueError, IndexError): + return None + + def encode_test_id(self, test_id: int) -> str: + return encode_id( + test_id, + self._config.security.encode_key, + self._config.security.encoded_string_length, + ) + + async def validate_test(self, test_id: int, user_id: int) -> TestValidationResult: + test = await self._test_dao.get_by_id(test_id) + + if not test: + return TestValidationResult(is_valid=False, error="❌ Тест не найден") + + if not test.is_active: + return TestValidationResult(is_valid=False, error="❌ Тест деактивирован", test=test) + + if test.expires_at and test.expires_at < now_msk_naive(): + return TestValidationResult(is_valid=False, error="❌ Срок действия теста истек", test=test) + + user = await self._user_dao.get_by_id(user_id) + if test.for_group and user and user.group != test.for_group: + return TestValidationResult( + is_valid=False, + error=f"❌ Тест доступен только для группы {test.for_group}", + test=test, + ) + + return TestValidationResult(is_valid=True, test=test) + + async def check_test_access(self, test_id: int, user_id: int) -> TestAccessResult: + test = await self._test_dao.get_by_id(test_id) + + if not test: + return TestAccessResult(can_access=False, error="❌ Тест не найден") + + if not test.is_active: + return TestAccessResult(can_access=False, error="❌ Тест деактивирован") + + if test.expires_at and test.expires_at < now_msk_naive(): + return TestAccessResult(can_access=False, error="❌ Срок действия теста истек") + + if test.attempts: + attempts = await self._attempt_repo.get_user_test_attempts(user_id, test_id) + finished_attempts = [a for a in attempts if a.finished_at] + remaining = test.attempts - len(finished_attempts) + + if remaining <= 0: + return TestAccessResult( + can_access=False, + error="❌ Вы исчерпали все попытки", + remaining_attempts=0, + ) + + return TestAccessResult(can_access=True, remaining_attempts=remaining) + + return TestAccessResult(can_access=True) + + async def get_available_tests(self, user_id: int, user_group: int | None) -> list[Test]: + return await self._test_repo.get_available_tests_for_user(user_id, user_group) + + async def toggle_test_active(self, test_id: int) -> TestUpdateResult: + test = await self._test_dao.get_by_id(test_id) + if not test: + return TestUpdateResult(success=False, message="❌ Тест не найден") + + await self._test_dao.update(test_id, is_active=not test.is_active) + action = "деактивирован" if test.is_active else "активирован" + return TestUpdateResult(success=True, message=f"✅ Тест {action}") + + async def toggle_results_viewable(self, test_id: int) -> TestUpdateResult: + test = await self._test_dao.get_by_id(test_id) + if not test: + return TestUpdateResult(success=False, message="❌ Тест не найден") + + await self._test_dao.update(test_id, are_results_viewable=not test.are_results_viewable) + action = "скрыты" if test.are_results_viewable else "видны" + return TestUpdateResult(success=True, message=f"✅ Результаты теперь {action}") + + async def delete_test(self, test_id: int) -> bool: + return await self._test_dao.delete(test_id) + + async def update_password(self, test_id: int, password: str) -> TestUpdateResult: + if len(password) > 255: + return TestUpdateResult(success=False, message="❌ Пароль слишком длинный (максимум 255 символов)") + await self._test_dao.update(test_id, password=password) + return TestUpdateResult(success=True, message="✅ Пароль обновлен") + + async def remove_password(self, test_id: int) -> TestUpdateResult: + await self._test_dao.update(test_id, password=None) + return TestUpdateResult(success=True, message="✅ Пароль удален") + + async def update_attempts(self, test_id: int, attempts: int) -> TestUpdateResult: + if attempts < 1: + return TestUpdateResult(success=False, message="❌ Количество попыток должно быть больше 0") + if attempts > 100: + return TestUpdateResult(success=False, message="❌ Количество попыток не может быть больше 100") + await self._test_dao.update(test_id, attempts=attempts) + return TestUpdateResult(success=True, message="✅ Количество попыток обновлено") + + async def remove_attempts(self, test_id: int) -> TestUpdateResult: + await self._test_dao.update(test_id, attempts=None) + return TestUpdateResult(success=True, message="✅ Ограничение попыток удалено") + + async def update_time_limit(self, test_id: int, minutes: int) -> TestUpdateResult: + if minutes < 1: + return TestUpdateResult(success=False, message="❌ Лимит времени должен быть больше 0") + if minutes > 1440: + return TestUpdateResult(success=False, message="❌ Лимит времени не может быть больше 1440 минут (24 часа)") + await self._test_dao.update(test_id, time_limit=minutes * 60) + return TestUpdateResult(success=True, message="✅ Лимит времени обновлен") + + async def remove_time_limit(self, test_id: int) -> TestUpdateResult: + await self._test_dao.update(test_id, time_limit=None) + return TestUpdateResult(success=True, message="✅ Лимит времени удален") + + async def update_group(self, test_id: int, group: int) -> TestUpdateResult: + await self._test_dao.update(test_id, for_group=group) + return TestUpdateResult(success=True, message="✅ Группа обновлена") + + async def remove_group(self, test_id: int) -> TestUpdateResult: + await self._test_dao.update(test_id, for_group=None) + return TestUpdateResult(success=True, message="✅ Тест теперь доступен для всех групп") + + async def update_expires(self, test_id: int, expires_at: datetime) -> TestUpdateResult: + await self._test_dao.update(test_id, expires_at=expires_at) + return TestUpdateResult(success=True, message="✅ Срок действия обновлен") + + async def remove_expires(self, test_id: int) -> TestUpdateResult: + await self._test_dao.update(test_id, expires_at=None) + return TestUpdateResult(success=True, message="✅ Срок действия удален") diff --git a/src/quizzi/service/test_attempt.py b/src/quizzi/service/test_attempt.py new file mode 100644 index 0000000..fa689df --- /dev/null +++ b/src/quizzi/service/test_attempt.py @@ -0,0 +1,204 @@ +from dataclasses import dataclass +from datetime import datetime + +from quizzi.domain.schemas import QuestionType +from quizzi.infrastructure.database.dao.test import TestDAO +from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO +from quizzi.infrastructure.database.repo.test import TestRepository +from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository +from quizzi.infrastructure.utils.timezone import now_msk_naive + + +@dataclass +class AttemptStartResult: + success: bool + attempt_id: int | None = None + questions: list[int] | None = None + started_at: datetime | None = None + error: str = "" + + +@dataclass +class AnswerResult: + success: bool + is_correct: bool = False + error: str = "" + + +@dataclass +class TestResult: + score: int + correct_count: int + total_questions: int + is_passed: bool + + +class TestAttemptService: + def __init__( + self, + test_dao: TestDAO, + test_repo: TestRepository, + attempt_repo: TestAttemptRepository, + answer_dao: UserAnswerDAO, + ) -> None: + self._test_dao = test_dao + self._test_repo = test_repo + self._attempt_repo = attempt_repo + self._answer_dao = answer_dao + + async def start_attempt(self, user_id: int, test_id: int) -> AttemptStartResult: + active_attempt = await self._attempt_repo.get_active_attempt(user_id, test_id) + if active_attempt: + await self._attempt_repo.attempt_dao.delete(active_attempt.id) + + _, questions = await self._test_repo.get_test_with_questions(test_id) + if not questions: + return AttemptStartResult(success=False, error="❌ В тесте нет вопросов") + + attempt = await self._attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) + started_at = now_msk_naive() + + return AttemptStartResult( + success=True, + attempt_id=attempt.id, + questions=[q.id for q in questions], + started_at=started_at, + ) + + async def cancel_attempt(self, attempt_id: int) -> bool: + return await self._attempt_repo.attempt_dao.delete(attempt_id) + + async def save_single_answer( + self, + attempt_id: int, + question_id: int, + selected_option_id: int, + ) -> AnswerResult: + question, options = await self._test_repo.get_question_with_options(question_id) + if not question: + return AnswerResult(success=False, error="❌ Вопрос не найден") + + correct_options = [opt for opt in options if opt.is_correct] + is_correct = any(opt.id == selected_option_id for opt in correct_options) + selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "") + + await self._answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + selected_option_id=selected_option_id, + text_answer=selected_text, + is_correct=is_correct, + ) + + return AnswerResult(success=True, is_correct=is_correct) + + async def save_multiple_answer( + self, + attempt_id: int, + question_id: int, + selected_option_ids: list[int], + ) -> AnswerResult: + question, options = await self._test_repo.get_question_with_options(question_id) + if not question: + return AnswerResult(success=False, error="❌ Вопрос не найден") + + selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids]) + correct_texts = sorted([opt.text for opt in options if opt.is_correct]) + is_correct = selected_texts == correct_texts + + await self._answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer="|".join(selected_texts), + is_correct=is_correct, + ) + + return AnswerResult(success=True, is_correct=is_correct) + + async def save_text_answer( + self, + attempt_id: int, + question_id: int, + text_answer: str, + ) -> AnswerResult: + question, options = await self._test_repo.get_question_with_options(question_id) + if not question: + return AnswerResult(success=False, error="❌ Вопрос не найден") + + correct_options = [opt for opt in options if opt.is_correct] + user_normalized = text_answer.lower().replace(" ", "") + is_correct = any(opt.text.lower().replace(" ", "") == user_normalized for opt in correct_options) + + await self._answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer=text_answer, + is_correct=is_correct, + ) + + return AnswerResult(success=True, is_correct=is_correct) + + async def finish_attempt(self, attempt_id: int, total_questions: int) -> TestResult: + correct_count = await self._attempt_repo.calculate_attempt_score(attempt_id) + score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0 + is_passed = score >= 50 + + await self._attempt_repo.finish_attempt(attempt_id, score, is_passed) + + return TestResult( + score=score, + correct_count=correct_count, + total_questions=total_questions, + is_passed=is_passed, + ) + + async def finish_by_timeout( + self, + attempt_id: int, + questions: list[int], + user_answers: dict, + ) -> TestResult: + answered_question_ids = set() + answers = await self._attempt_repo.get_answers_for_attempt(attempt_id) + for answer in answers: + answered_question_ids.add(answer.question_id) + + for question_id in questions: + if question_id in answered_question_ids: + continue + + answer_data = user_answers.get(str(question_id)) + + if answer_data: + question, options = await self._test_repo.get_question_with_options(question_id) + if not question: + continue + + if answer_data["type"] == "single": + await self.save_single_answer(attempt_id, question_id, answer_data["answer"]) + elif answer_data["type"] == "multiple": + await self.save_multiple_answer(attempt_id, question_id, answer_data["answer"]) + else: + await self._answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer=None, + is_correct=False, + ) + + return await self.finish_attempt(attempt_id, len(questions)) + + async def get_question_state(self, question_type: str): + from quizzi.application.bot.user_dialogs.states import UserTestSG + + if question_type == QuestionType.SINGLE: + return UserTestSG.question_single + elif question_type == QuestionType.MULTIPLE: + return UserTestSG.question_multiple + else: + return UserTestSG.question_input + + async def get_next_question_state(self, question_id: int): + question, _ = await self._test_repo.get_question_with_options(question_id) + question_type = question.question_type if question else QuestionType.SINGLE + return await self.get_question_state(question_type) diff --git a/src/quizzi/service/user.py b/src/quizzi/service/user.py new file mode 100644 index 0000000..ec25f1a --- /dev/null +++ b/src/quizzi/service/user.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass + +from quizzi.domain.schemas import User +from quizzi.infrastructure.database.dao.group import GroupDAO +from quizzi.infrastructure.database.dao.user import UserDAO + + +@dataclass +class RegistrationResult: + is_registered: bool + needs_name: bool = False + needs_group: bool = False + has_groups: bool = False + user: User | None = None + + +class UserService: + def __init__(self, user_dao: UserDAO, group_dao: GroupDAO) -> None: + self._user_dao = user_dao + self._group_dao = group_dao + + async def check_registration(self, user_id: int) -> RegistrationResult: + user = await self._user_dao.get_by_id(user_id) + groups = await self._group_dao.get_all() + has_groups = len(groups) > 0 + + if user is None: + return RegistrationResult( + is_registered=False, + needs_name=True, + needs_group=has_groups, + has_groups=has_groups, + ) + + needs_name = user.name is None + needs_group = has_groups and user.group is None + + return RegistrationResult( + is_registered=not needs_name and not needs_group, + needs_name=needs_name, + needs_group=needs_group, + has_groups=has_groups, + user=user, + ) + + async def create_user( + self, + user_id: int, + first_name: str, + username: str | None = None, + last_name: str | None = None, + ) -> User: + return await self._user_dao.create( + user_id=user_id, + first_name=first_name, + username=username, + last_name=last_name, + ) + + async def update_user_info( + self, + user_id: int, + first_name: str, + username: str | None = None, + last_name: str | None = None, + ) -> User | None: + return await self._user_dao.upsert( + user_id=user_id, + first_name=first_name, + username=username, + last_name=last_name, + ) + + async def get_all_users(self) -> list[User]: + return await self._user_dao.get_all()