diff --git a/src/quizzi/application/__main__.py b/src/quizzi/application/__main__.py
index 52df068..1b810b6 100644
--- a/src/quizzi/application/__main__.py
+++ b/src/quizzi/application/__main__.py
@@ -26,7 +26,7 @@ from quizzi.application.bot.user_dialogs.main_menu import user_menu_dialog
from quizzi.application.bot.user_dialogs.registration import registration_dialog
from quizzi.application.bot.user_dialogs.take_test import take_test_dialog
from quizzi.infrastructure.database.repo.user import UserRepository
-from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider
+from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider, ServiceProvider
from quizzi.infrastructure.utils.bot_commands import setup_bot_commands
from quizzi.infrastructure.utils.config import Config
@@ -68,6 +68,7 @@ async def main() -> None:
container = make_async_container(
DatabaseProvider(),
+ ServiceProvider(),
SchedulerProvider(),
context={Bot: bot, Config: config}
)
diff --git a/src/quizzi/application/bot/handlers.py b/src/quizzi/application/bot/handlers.py
index 4dbff34..dfee906 100644
--- a/src/quizzi/application/bot/handlers.py
+++ b/src/quizzi/application/bot/handlers.py
@@ -10,36 +10,29 @@ from dishka.integrations.aiogram import FromDishka
from quizzi.application.bot.admin_dialogs.states import AdminMenuSG
from quizzi.application.bot.creator_dialogs.states import CreatorMenuSG
from quizzi.application.bot.user_dialogs.states import UserDeeplinkSG, UserMenuSG, UserRegistrationSG
-from quizzi.infrastructure.database.dao.group import GroupDAO
-from quizzi.infrastructure.database.dao.test import TestDAO
-from quizzi.infrastructure.database.dao.user import UserDAO
-from quizzi.infrastructure.utils.config import Config
-from quizzi.infrastructure.utils.test_id_to_hash import decode_id
-from quizzi.infrastructure.utils.timezone import now_msk_naive
+from quizzi.service.test import TestService
+from quizzi.service.user import UserService
router = Router()
logger = logging.getLogger(__name__)
async def ensure_user_registered(
- user_dao: UserDAO,
- group_dao: GroupDAO,
+ user_service: UserService,
message: Message,
dialog_manager: DialogManager,
pending_test_id: int | None = None,
) -> bool:
assert message.from_user is not None
- existing_user = await user_dao.get_by_id(message.from_user.id)
- groups = await group_dao.get_all()
- has_groups = len(groups) > 0
+ result = await user_service.check_registration(message.from_user.id)
- start_data = {"user_id": message.from_user.id, "has_groups": has_groups}
+ start_data = {"user_id": message.from_user.id, "has_groups": result.has_groups}
if pending_test_id:
start_data["pending_test_id"] = pending_test_id
- if existing_user is None:
- await user_dao.create(
+ if result.user is None:
+ await user_service.create_user(
user_id=message.from_user.id,
first_name=message.from_user.first_name,
username=message.from_user.username,
@@ -52,10 +45,7 @@ async def ensure_user_registered(
)
return False
- needs_name = existing_user.name is None
- needs_group = has_groups and existing_user.group is None
-
- if needs_name:
+ if result.needs_name:
await dialog_manager.start(
UserRegistrationSG.input_name,
mode=StartMode.RESET_STACK,
@@ -63,7 +53,7 @@ async def ensure_user_registered(
)
return False
- if needs_group:
+ if result.needs_group:
await dialog_manager.start(
UserRegistrationSG.select_group,
mode=StartMode.RESET_STACK,
@@ -71,7 +61,7 @@ async def ensure_user_registered(
)
return False
- await user_dao.upsert(
+ await user_service.update_user_info(
user_id=message.from_user.id,
first_name=message.from_user.first_name,
username=message.from_user.username,
@@ -80,39 +70,13 @@ async def ensure_user_registered(
return True
-async def validate_deeplink_test(
- test_dao: TestDAO,
- user_dao: UserDAO,
- test_id: int,
- user_id: int,
-) -> tuple[bool, str]:
- test = await test_dao.get_by_id(test_id)
-
- if not test:
- return False, "❌ Тест не найден"
-
- if not test.is_active:
- return False, "❌ Тест деактивирован"
-
- if test.expires_at and test.expires_at < now_msk_naive():
- return False, "❌ Срок действия теста истек"
-
- user = await user_dao.get_by_id(user_id)
- if test.for_group and user and user.group != test.for_group:
- return False, f"❌ Тест доступен только для группы {test.for_group}"
-
- return True, ""
-
-
@router.message(CommandStart(deep_link=True))
async def start_with_deeplink(
message: Message,
command: CommandObject,
dialog_manager: DialogManager,
- user_dao: FromDishka[UserDAO],
- group_dao: FromDishka[GroupDAO],
- test_dao: FromDishka[TestDAO],
- config: FromDishka[Config],
+ user_service: FromDishka[UserService],
+ test_service: FromDishka[TestService],
) -> None:
assert message.from_user is not None
@@ -125,39 +89,36 @@ async def start_with_deeplink(
)
if not deeplink:
- await start_handler(message, user_dao, group_dao, dialog_manager)
+ await start_handler(message, user_service, dialog_manager)
return
- try:
- test_id = decode_id(deeplink, config.security.encode_key)
- except (ValueError, IndexError):
+ test_id = test_service.decode_test_hash(deeplink)
+ if test_id is None:
logger.warning("Invalid deeplink: user_id=%d, deeplink=%s", message.from_user.id, deeplink)
await message.answer("❌ Неверная ссылка на тест")
- await start_handler(message, user_dao, group_dao, dialog_manager)
+ await start_handler(message, user_service, dialog_manager)
return
is_registered = await ensure_user_registered(
- user_dao, group_dao, message, dialog_manager, pending_test_id=test_id
+ user_service, message, dialog_manager, pending_test_id=test_id
)
if not is_registered:
return
- is_valid, error = await validate_deeplink_test(
- test_dao, user_dao, test_id, message.from_user.id
- )
+ validation = await test_service.validate_test(test_id, message.from_user.id)
- if not is_valid:
+ if not validation.is_valid:
logger.info(
"Test validation failed: user_id=%d, test_id=%d, error=%s",
message.from_user.id,
test_id,
- error,
+ validation.error,
)
await dialog_manager.start(
UserDeeplinkSG.test_preview,
mode=StartMode.RESET_STACK,
- data={"test_id": test_id, "error": error}
+ data={"test_id": test_id, "error": validation.error}
)
return
@@ -173,8 +134,7 @@ async def start_with_deeplink(
async def start_handler(
message: Message,
dialog_manager: DialogManager,
- user_dao: FromDishka[UserDAO],
- group_dao: FromDishka[GroupDAO],
+ user_service: FromDishka[UserService],
) -> None:
assert message.from_user is not None
logger.info(
@@ -184,7 +144,7 @@ async def start_handler(
)
is_registered = await ensure_user_registered(
- user_dao, group_dao, message, dialog_manager
+ user_service, message, dialog_manager
)
if is_registered:
diff --git a/src/quizzi/application/bot/shared_dialogs/broadcast.py b/src/quizzi/application/bot/shared_dialogs/broadcast.py
index 17006f0..ff4fb4b 100644
--- a/src/quizzi/application/bot/shared_dialogs/broadcast.py
+++ b/src/quizzi/application/bot/shared_dialogs/broadcast.py
@@ -7,8 +7,7 @@ from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject
from quizzi.application.bot.shared_dialogs.states import SharedBroadcastSG
-from quizzi.infrastructure.database.dao.user import UserDAO
-from quizzi.infrastructure.utils.broadcast import broadcast_message
+from quizzi.service.broadcast import BroadcastService
async def on_broadcast_input(message: Message, _widget: MessageInput, manager: DialogManager):
@@ -18,7 +17,12 @@ async def on_broadcast_input(message: Message, _widget: MessageInput, manager: D
@inject
-async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manager: DialogManager, user_dao: FromDishka[UserDAO]):
+async def on_broadcast_confirm(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ broadcast_service: FromDishka[BroadcastService],
+):
message_id = manager.dialog_data.get("broadcast_message_id")
chat_id = manager.dialog_data.get("broadcast_chat_id")
@@ -33,7 +37,7 @@ async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manage
await _callback.answer("Ошибка: бот не найден")
return
- stats = await broadcast_message(bot, message_id, chat_id, user_dao)
+ stats = await broadcast_service.broadcast_message(bot, message_id, chat_id)
stats_text = (
f"✅ Рассылка завершена\n\n"
diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py
index 12d2c97..288adef 100644
--- a/src/quizzi/application/bot/shared_dialogs/tests.py
+++ b/src/quizzi/application/bot/shared_dialogs/tests.py
@@ -1,6 +1,5 @@
import asyncio
import functools
-import io
from datetime import date, datetime, time
from aiogram import Bot
@@ -11,18 +10,16 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG
from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject
-from openpyxl import Workbook
-from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG
from quizzi.infrastructure.database.dao.group import GroupDAO
from quizzi.infrastructure.database.dao.test import TestDAO
from quizzi.infrastructure.database.repo.test import TestRepository
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
-from quizzi.infrastructure.utils.config import Config
from quizzi.infrastructure.utils.qr_generator import generate_qr_bytes
-from quizzi.infrastructure.utils.test_id_to_hash import encode_id
from quizzi.infrastructure.utils.timezone import to_msk
+from quizzi.service.excel import ExcelService
+from quizzi.service.test import TestService
@inject
@@ -101,34 +98,38 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T
@inject
-async def on_toggle_active(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_toggle_active(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- test = await test_dao.get_by_id(test_id)
-
- if test:
- await test_dao.update(test_id, is_active=not test.is_active)
- action = "деактивирован" if test.is_active else "активирован"
- await _callback.answer(f"✅ Тест {action}")
+ result = await test_service.toggle_test_active(test_id)
+ await _callback.answer(result.message)
+ if result.success:
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_toggle_results_viewable(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_toggle_results_viewable(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- test = await test_dao.get_by_id(test_id)
-
- if test:
- await test_dao.update(test_id, are_results_viewable=not test.are_results_viewable)
- action = "скрыты" if test.are_results_viewable else "видны"
- await _callback.answer(f"✅ Результаты теперь {action}")
+ result = await test_service.toggle_results_viewable(test_id)
+ await _callback.answer(result.message)
+ if result.success:
await manager.switch_to(SharedTestsSG.test_detail)
@@ -235,7 +236,13 @@ async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: Di
@inject
-async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager, config: FromDishka[Config], bot_inst: FromDishka[Bot]):
+async def on_share_test(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+ bot_inst: FromDishka[Bot],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
@@ -243,11 +250,7 @@ async def on_share_test(_callback: CallbackQuery, _button: Button, manager: Dial
"share_link": "Ошибка: тест не найден"
}
- test_hash = encode_id(
- test_id,
- config.security.encode_key,
- config.security.encoded_string_length
- )
+ test_hash = test_service.encode_test_id(test_id)
bot_info = await bot_inst.get_me()
bot_username = bot_info.username or "your_bot"
@@ -314,13 +317,18 @@ async def get_delete_confirm_data(dialog_manager: DialogManager, test_dao: FromD
@inject
-async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_confirm_delete(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- deleted = await test_dao.delete(test_id)
+ deleted = await test_service.delete_test(test_id)
if deleted:
await _callback.answer("✅ Тест удалён")
await manager.switch_to(SharedTestsSG.tests_list)
@@ -329,7 +337,12 @@ async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager:
@inject
-async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_password_input(
+ message: Message,
+ _widget: MessageInput,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await message.answer("❌ Тест не найден")
@@ -339,30 +352,36 @@ async def on_password_input(message: Message, _widget: MessageInput, manager: Di
await message.answer("❌ Пароль не может быть пустым")
return
- password = message.text.strip()
- if len(password) > 255:
- await message.answer("❌ Пароль слишком длинный (максимум 255 символов)")
- return
-
- await test_dao.update(test_id, password=password)
- await message.answer("✅ Пароль обновлен")
- await manager.switch_to(SharedTestsSG.test_detail)
+ result = await test_service.update_password(test_id, message.text.strip())
+ await message.answer(result.message)
+ if result.success:
+ await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_remove_password(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_remove_password(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, password=None)
- await _callback.answer("✅ Пароль удален")
+ result = await test_service.remove_password(test_id)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_attempts_input_edit(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_attempts_input_edit(
+ message: Message,
+ _widget: MessageInput,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await message.answer("❌ Тест не найден")
@@ -373,40 +392,40 @@ async def on_attempts_input_edit(message: Message, _widget: MessageInput, manage
return
attempts_str = message.text.strip()
-
if not attempts_str.isdigit():
await message.answer("❌ Количество попыток должно быть числом")
return
- attempts = int(attempts_str)
-
- if attempts < 1:
- await message.answer("❌ Количество попыток должно быть больше 0")
- return
-
- if attempts > 100:
- await message.answer("❌ Количество попыток не может быть больше 100")
- return
-
- await test_dao.update(test_id, attempts=attempts)
- await message.answer("✅ Количество попыток обновлено")
- await manager.switch_to(SharedTestsSG.test_detail)
+ result = await test_service.update_attempts(test_id, int(attempts_str))
+ await message.answer(result.message)
+ if result.success:
+ await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_remove_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_remove_attempts(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, attempts=None)
- await _callback.answer("✅ Ограничение попыток удалено")
+ result = await test_service.remove_attempts(test_id)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_time_limit_input(
+ message: Message,
+ _widget: MessageInput,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await message.answer("❌ Тест не найден")
@@ -417,36 +436,30 @@ async def on_time_limit_input(message: Message, _widget: MessageInput, manager:
return
time_limit_str = message.text.strip()
-
if not time_limit_str.isdigit():
await message.answer("❌ Лимит времени должен быть числом (в минутах)")
return
- time_limit_minutes = int(time_limit_str)
-
- if time_limit_minutes < 1:
- await message.answer("❌ Лимит времени должен быть больше 0")
- return
-
- if time_limit_minutes > 1440:
- await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)")
- return
-
- time_limit_seconds = time_limit_minutes * 60
- await test_dao.update(test_id, time_limit=time_limit_seconds)
- await message.answer("✅ Лимит времени обновлен")
- await manager.switch_to(SharedTestsSG.test_detail)
+ result = await test_service.update_time_limit(test_id, int(time_limit_str))
+ await message.answer(result.message)
+ if result.success:
+ await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_remove_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_remove_time_limit(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, time_limit=None)
- await _callback.answer("✅ Лимит времени удален")
+ result = await test_service.remove_time_limit(test_id)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@@ -460,51 +473,73 @@ async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDish
@inject
-async def on_group_selected_for_test(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str, test_dao: FromDishka[TestDAO]):
+async def on_group_selected_for_test(
+ _callback: CallbackQuery,
+ _widget,
+ manager: DialogManager,
+ item_id: str,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, for_group=int(item_id))
- await _callback.answer("✅ Группа обновлена")
+ result = await test_service.update_group(test_id, int(item_id))
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_remove_group(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_remove_group(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, for_group=None)
- await _callback.answer("✅ Тест теперь доступен для всех групп")
+ result = await test_service.remove_group(test_id)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_date_selected_for_test(_callback, _widget, manager: DialogManager, selected_date: date, test_dao: FromDishka[TestDAO]):
+async def on_date_selected_for_test(
+ _callback,
+ _widget,
+ manager: DialogManager,
+ selected_date: date,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
expires_at = datetime.combine(selected_date, time.min)
- await test_dao.update(test_id, expires_at=expires_at)
- await _callback.answer("✅ Срок действия обновлен")
+ result = await test_service.update_expires(test_id, expires_at)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@inject
-async def on_remove_expires(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
+async def on_remove_expires(
+ _callback: CallbackQuery,
+ _button: Button,
+ manager: DialogManager,
+ test_service: FromDishka[TestService],
+):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
- await test_dao.update(test_id, expires_at=None)
- await _callback.answer("✅ Срок действия удален")
+ result = await test_service.remove_expires(test_id)
+ await _callback.answer(result.message)
await manager.switch_to(SharedTestsSG.test_detail)
@@ -525,115 +560,13 @@ async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDi
}
-def create_excel_report(
- test_title: str,
- group_number: int,
- stats: list[tuple[str, int | None, datetime | None, bool | None]],
-) -> bytes:
- wb = Workbook()
- ws = wb.active
- assert ws is not None
- ws.title = "Статистика"
-
- header_font = Font(bold=True, color="FFFFFF")
- header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
- header_alignment = Alignment(horizontal="center", vertical="center")
- thin_border = Border(
- left=Side(style="thin"),
- right=Side(style="thin"),
- top=Side(style="thin"),
- bottom=Side(style="thin"),
- )
-
- ws.merge_cells("A1:E1")
- ws["A1"] = f"Тест: {test_title}"
- ws["A1"].font = Font(bold=True, size=14)
- ws["A1"].alignment = Alignment(horizontal="center")
-
- ws.merge_cells("A2:E2")
- ws["A2"] = f"Группа: {group_number}"
- ws["A2"].font = Font(bold=True, size=12)
- ws["A2"].alignment = Alignment(horizontal="center")
-
- headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
- for col, header in enumerate(headers, 1):
- cell = ws.cell(row=4, column=col, value=header)
- cell.font = header_font
- cell.fill = header_fill
- cell.alignment = header_alignment
- cell.border = thin_border
-
- passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
- failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
- not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
-
- grades: list[int] = []
-
- for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
- ws.cell(row=row_idx, column=1, value=name).border = thin_border
-
- if score is not None:
- ws.cell(row=row_idx, column=2, value=score).border = thin_border
-
- grade = score // 10
- grades.append(grade)
- ws.cell(row=row_idx, column=3, value=grade).border = thin_border
-
- finished_msk = to_msk(finished_at) if finished_at else None
- date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—"
- ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
- status = "Пройден" if is_passed else "Не пройден"
- status_cell = ws.cell(row=row_idx, column=5, value=status)
- status_cell.border = thin_border
-
- for col in range(1, 6):
- ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
- else:
- ws.cell(row=row_idx, column=2, value="—").border = thin_border
- ws.cell(row=row_idx, column=3, value="—").border = thin_border
- ws.cell(row=row_idx, column=4, value="—").border = thin_border
- status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
- status_cell.border = thin_border
-
- for col in range(1, 6):
- ws.cell(row=row_idx, column=col).fill = not_passed_fill
-
- ws.column_dimensions["A"].width = 30
- ws.column_dimensions["B"].width = 15
- ws.column_dimensions["C"].width = 10
- ws.column_dimensions["D"].width = 20
- ws.column_dimensions["E"].width = 15
-
- total_users = len(stats)
- passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
- attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
-
- summary_row = len(stats) + 6
- ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
- ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
- ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
- ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
- if attempted_users > 0:
- success_rate = round(passed_users / attempted_users * 100)
- ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
- if grades:
- avg_grade = round(sum(grades) / len(grades), 1)
- ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
-
- output = io.BytesIO()
- wb.save(output)
- output.seek(0)
- return output.read()
-
-
@inject
async def on_group_selected_for_export(
_callback: CallbackQuery,
_widget: Select,
manager: DialogManager,
item_id: str,
- test_dao: FromDishka[TestDAO],
- attempt_repo: FromDishka[TestAttemptRepository],
+ excel_service: FromDishka[ExcelService],
) -> None:
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
@@ -643,26 +576,16 @@ async def on_group_selected_for_export(
assert _callback.message is not None
await _callback.answer("⏳ Формирую отчёт...")
- test = await test_dao.get_by_id(test_id)
- if not test:
- await _callback.message.answer("❌ Тест не найден")
- return
-
group_number = int(item_id)
- stats = await attempt_repo.get_group_test_statistics(test_id, group_number)
+ result = await excel_service.generate_group_report(test_id, group_number)
- if not stats:
- await _callback.message.answer(f"❌ В группе {group_number} нет студентов")
+ if not result.success or not result.data or not result.filename:
+ await _callback.message.answer(result.caption)
return
- excel_bytes = create_excel_report(test.title, group_number, stats)
-
- safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
- filename = f"{safe_title}_group_{group_number}.xlsx"
-
await _callback.message.answer_document(
- document=BufferedInputFile(excel_bytes, filename=filename),
- caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}",
+ document=BufferedInputFile(result.data, filename=result.filename),
+ caption=result.caption,
)
diff --git a/src/quizzi/infrastructure/di.py b/src/quizzi/infrastructure/di.py
index 3d6d39a..35e4386 100644
--- a/src/quizzi/infrastructure/di.py
+++ b/src/quizzi/infrastructure/di.py
@@ -20,6 +20,11 @@ from quizzi.infrastructure.database.repo.user import UserRepository
from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications
from quizzi.infrastructure.utils.config import Config
from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter
+from quizzi.service.broadcast import BroadcastService
+from quizzi.service.excel import ExcelService
+from quizzi.service.test import TestService
+from quizzi.service.test_attempt import TestAttemptService
+from quizzi.service.user import UserService
class DatabaseProvider(Provider):
@@ -80,6 +85,41 @@ class DatabaseProvider(Provider):
return TestAttemptRepository(session)
+class ServiceProvider(Provider):
+ @provide(scope=Scope.REQUEST)
+ def get_user_service(self, user_dao: UserDAO, group_dao: GroupDAO) -> UserService:
+ return UserService(user_dao, group_dao)
+
+ @provide(scope=Scope.REQUEST)
+ def get_test_service(
+ self,
+ test_dao: TestDAO,
+ test_repo: TestRepository,
+ attempt_repo: TestAttemptRepository,
+ user_dao: UserDAO,
+ config: Config,
+ ) -> TestService:
+ return TestService(test_dao, test_repo, attempt_repo, user_dao, config)
+
+ @provide(scope=Scope.REQUEST)
+ def get_test_attempt_service(
+ self,
+ test_dao: TestDAO,
+ test_repo: TestRepository,
+ attempt_repo: TestAttemptRepository,
+ answer_dao: UserAnswerDAO,
+ ) -> TestAttemptService:
+ return TestAttemptService(test_dao, test_repo, attempt_repo, answer_dao)
+
+ @provide(scope=Scope.REQUEST)
+ def get_broadcast_service(self, user_dao: UserDAO) -> BroadcastService:
+ return BroadcastService(user_dao)
+
+ @provide(scope=Scope.REQUEST)
+ def get_excel_service(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> ExcelService:
+ return ExcelService(test_dao, attempt_repo)
+
+
class SchedulerProvider(Provider):
@provide(scope = Scope.APP)
def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler:
diff --git a/src/quizzi/service/__init__.py b/src/quizzi/service/__init__.py
new file mode 100644
index 0000000..7721cac
--- /dev/null
+++ b/src/quizzi/service/__init__.py
@@ -0,0 +1 @@
+# Service layer - бизнес-логика приложения
diff --git a/src/quizzi/service/broadcast.py b/src/quizzi/service/broadcast.py
new file mode 100644
index 0000000..ab8bcf2
--- /dev/null
+++ b/src/quizzi/service/broadcast.py
@@ -0,0 +1,43 @@
+from dataclasses import dataclass
+
+from aiogram import Bot
+from aiogram.exceptions import TelegramAPIError
+
+from quizzi.infrastructure.database.dao.user import UserDAO
+
+
+@dataclass
+class BroadcastStats:
+ total: int
+ success: int
+ failed: int
+
+
+class BroadcastService:
+ def __init__(self, user_dao: UserDAO) -> None:
+ self._user_dao = user_dao
+
+ async def broadcast_message(
+ self,
+ bot: Bot,
+ message_id: int,
+ from_chat_id: int,
+ ) -> BroadcastStats:
+ users = await self._user_dao.get_all()
+
+ total = len(users)
+ success = 0
+ failed = 0
+
+ for user in users:
+ try:
+ await bot.copy_message(
+ chat_id=user.id,
+ from_chat_id=from_chat_id,
+ message_id=message_id,
+ )
+ success += 1
+ except TelegramAPIError:
+ failed += 1
+
+ return BroadcastStats(total=total, success=success, failed=failed)
diff --git a/src/quizzi/service/excel.py b/src/quizzi/service/excel.py
new file mode 100644
index 0000000..64ca8be
--- /dev/null
+++ b/src/quizzi/service/excel.py
@@ -0,0 +1,145 @@
+import io
+from dataclasses import dataclass
+from datetime import datetime
+
+from openpyxl import Workbook
+from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
+
+from quizzi.infrastructure.database.dao.test import TestDAO
+from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
+from quizzi.infrastructure.utils.timezone import to_msk
+
+
+@dataclass
+class ExcelReportResult:
+ success: bool
+ data: bytes | None = None
+ filename: str | None = None
+ caption: str = ""
+
+
+class ExcelService:
+ def __init__(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> None:
+ self._test_dao = test_dao
+ self._attempt_repo = attempt_repo
+
+ def create_test_report(
+ self,
+ test_title: str,
+ group_number: int,
+ stats: list[tuple[str, int | None, datetime | None, bool | None]],
+ ) -> bytes:
+ wb = Workbook()
+ ws = wb.active
+ assert ws is not None
+ ws.title = "Статистика"
+
+ header_font = Font(bold=True, color="FFFFFF")
+ header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
+ header_alignment = Alignment(horizontal="center", vertical="center")
+ thin_border = Border(
+ left=Side(style="thin"),
+ right=Side(style="thin"),
+ top=Side(style="thin"),
+ bottom=Side(style="thin"),
+ )
+
+ ws.merge_cells("A1:E1")
+ ws["A1"] = f"Тест: {test_title}"
+ ws["A1"].font = Font(bold=True, size=14)
+ ws["A1"].alignment = Alignment(horizontal="center")
+
+ ws.merge_cells("A2:E2")
+ ws["A2"] = f"Группа: {group_number}"
+ ws["A2"].font = Font(bold=True, size=12)
+ ws["A2"].alignment = Alignment(horizontal="center")
+
+ headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
+ for col, header in enumerate(headers, 1):
+ cell = ws.cell(row=4, column=col, value=header)
+ cell.font = header_font
+ cell.fill = header_fill
+ cell.alignment = header_alignment
+ cell.border = thin_border
+
+ passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
+ failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
+ not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
+
+ grades: list[int] = []
+
+ for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
+ ws.cell(row=row_idx, column=1, value=name).border = thin_border
+
+ if score is not None:
+ ws.cell(row=row_idx, column=2, value=score).border = thin_border
+
+ grade = score // 10
+ grades.append(grade)
+ ws.cell(row=row_idx, column=3, value=grade).border = thin_border
+
+ finished_msk = to_msk(finished_at) if finished_at else None
+ date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—"
+ ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
+ status = "Пройден" if is_passed else "Не пройден"
+ status_cell = ws.cell(row=row_idx, column=5, value=status)
+ status_cell.border = thin_border
+
+ for col in range(1, 6):
+ ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
+ else:
+ ws.cell(row=row_idx, column=2, value="—").border = thin_border
+ ws.cell(row=row_idx, column=3, value="—").border = thin_border
+ ws.cell(row=row_idx, column=4, value="—").border = thin_border
+ status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
+ status_cell.border = thin_border
+
+ for col in range(1, 6):
+ ws.cell(row=row_idx, column=col).fill = not_passed_fill
+
+ ws.column_dimensions["A"].width = 30
+ ws.column_dimensions["B"].width = 15
+ ws.column_dimensions["C"].width = 10
+ ws.column_dimensions["D"].width = 20
+ ws.column_dimensions["E"].width = 15
+
+ total_users = len(stats)
+ passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
+ attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
+
+ summary_row = len(stats) + 6
+ ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
+ ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
+ ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
+ ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
+ if attempted_users > 0:
+ success_rate = round(passed_users / attempted_users * 100)
+ ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
+ if grades:
+ avg_grade = round(sum(grades) / len(grades), 1)
+ ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
+
+ output = io.BytesIO()
+ wb.save(output)
+ output.seek(0)
+ return output.read()
+
+ async def generate_group_report(self, test_id: int, group_number: int) -> ExcelReportResult:
+ test = await self._test_dao.get_by_id(test_id)
+ if not test:
+ return ExcelReportResult(success=False, caption="❌ Тест не найден")
+
+ stats = await self._attempt_repo.get_group_test_statistics(test_id, group_number)
+ if not stats:
+ return ExcelReportResult(success=False, caption=f"❌ В группе {group_number} нет студентов")
+
+ excel_bytes = self.create_test_report(test.title, group_number, stats)
+ safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
+ filename = f"{safe_title}_group_{group_number}.xlsx"
+
+ return ExcelReportResult(
+ success=True,
+ data=excel_bytes,
+ filename=filename,
+ caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}",
+ )
diff --git a/src/quizzi/service/test.py b/src/quizzi/service/test.py
new file mode 100644
index 0000000..8128f8a
--- /dev/null
+++ b/src/quizzi/service/test.py
@@ -0,0 +1,184 @@
+from dataclasses import dataclass
+from datetime import datetime
+
+from quizzi.domain.schemas import Test
+from quizzi.infrastructure.database.dao.test import TestDAO
+from quizzi.infrastructure.database.dao.user import UserDAO
+from quizzi.infrastructure.database.repo.test import TestRepository
+from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
+from quizzi.infrastructure.utils.config import Config
+from quizzi.infrastructure.utils.test_id_to_hash import decode_id, encode_id
+from quizzi.infrastructure.utils.timezone import now_msk_naive
+
+
+@dataclass
+class TestValidationResult:
+ is_valid: bool
+ error: str = ""
+ test: Test | None = None
+
+
+@dataclass
+class TestAccessResult:
+ can_access: bool
+ error: str = ""
+ remaining_attempts: int | None = None
+
+
+@dataclass
+class TestUpdateResult:
+ success: bool
+ message: str
+
+
+class TestService:
+ def __init__(
+ self,
+ test_dao: TestDAO,
+ test_repo: TestRepository,
+ attempt_repo: TestAttemptRepository,
+ user_dao: UserDAO,
+ config: Config,
+ ) -> None:
+ self._test_dao = test_dao
+ self._test_repo = test_repo
+ self._attempt_repo = attempt_repo
+ self._user_dao = user_dao
+ self._config = config
+
+ def decode_test_hash(self, test_hash: str) -> int | None:
+ try:
+ return decode_id(test_hash, self._config.security.encode_key)
+ except (ValueError, IndexError):
+ return None
+
+ def encode_test_id(self, test_id: int) -> str:
+ return encode_id(
+ test_id,
+ self._config.security.encode_key,
+ self._config.security.encoded_string_length,
+ )
+
+ async def validate_test(self, test_id: int, user_id: int) -> TestValidationResult:
+ test = await self._test_dao.get_by_id(test_id)
+
+ if not test:
+ return TestValidationResult(is_valid=False, error="❌ Тест не найден")
+
+ if not test.is_active:
+ return TestValidationResult(is_valid=False, error="❌ Тест деактивирован", test=test)
+
+ if test.expires_at and test.expires_at < now_msk_naive():
+ return TestValidationResult(is_valid=False, error="❌ Срок действия теста истек", test=test)
+
+ user = await self._user_dao.get_by_id(user_id)
+ if test.for_group and user and user.group != test.for_group:
+ return TestValidationResult(
+ is_valid=False,
+ error=f"❌ Тест доступен только для группы {test.for_group}",
+ test=test,
+ )
+
+ return TestValidationResult(is_valid=True, test=test)
+
+ async def check_test_access(self, test_id: int, user_id: int) -> TestAccessResult:
+ test = await self._test_dao.get_by_id(test_id)
+
+ if not test:
+ return TestAccessResult(can_access=False, error="❌ Тест не найден")
+
+ if not test.is_active:
+ return TestAccessResult(can_access=False, error="❌ Тест деактивирован")
+
+ if test.expires_at and test.expires_at < now_msk_naive():
+ return TestAccessResult(can_access=False, error="❌ Срок действия теста истек")
+
+ if test.attempts:
+ attempts = await self._attempt_repo.get_user_test_attempts(user_id, test_id)
+ finished_attempts = [a for a in attempts if a.finished_at]
+ remaining = test.attempts - len(finished_attempts)
+
+ if remaining <= 0:
+ return TestAccessResult(
+ can_access=False,
+ error="❌ Вы исчерпали все попытки",
+ remaining_attempts=0,
+ )
+
+ return TestAccessResult(can_access=True, remaining_attempts=remaining)
+
+ return TestAccessResult(can_access=True)
+
+ async def get_available_tests(self, user_id: int, user_group: int | None) -> list[Test]:
+ return await self._test_repo.get_available_tests_for_user(user_id, user_group)
+
+ async def toggle_test_active(self, test_id: int) -> TestUpdateResult:
+ test = await self._test_dao.get_by_id(test_id)
+ if not test:
+ return TestUpdateResult(success=False, message="❌ Тест не найден")
+
+ await self._test_dao.update(test_id, is_active=not test.is_active)
+ action = "деактивирован" if test.is_active else "активирован"
+ return TestUpdateResult(success=True, message=f"✅ Тест {action}")
+
+ async def toggle_results_viewable(self, test_id: int) -> TestUpdateResult:
+ test = await self._test_dao.get_by_id(test_id)
+ if not test:
+ return TestUpdateResult(success=False, message="❌ Тест не найден")
+
+ await self._test_dao.update(test_id, are_results_viewable=not test.are_results_viewable)
+ action = "скрыты" if test.are_results_viewable else "видны"
+ return TestUpdateResult(success=True, message=f"✅ Результаты теперь {action}")
+
+ async def delete_test(self, test_id: int) -> bool:
+ return await self._test_dao.delete(test_id)
+
+ async def update_password(self, test_id: int, password: str) -> TestUpdateResult:
+ if len(password) > 255:
+ return TestUpdateResult(success=False, message="❌ Пароль слишком длинный (максимум 255 символов)")
+ await self._test_dao.update(test_id, password=password)
+ return TestUpdateResult(success=True, message="✅ Пароль обновлен")
+
+ async def remove_password(self, test_id: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, password=None)
+ return TestUpdateResult(success=True, message="✅ Пароль удален")
+
+ async def update_attempts(self, test_id: int, attempts: int) -> TestUpdateResult:
+ if attempts < 1:
+ return TestUpdateResult(success=False, message="❌ Количество попыток должно быть больше 0")
+ if attempts > 100:
+ return TestUpdateResult(success=False, message="❌ Количество попыток не может быть больше 100")
+ await self._test_dao.update(test_id, attempts=attempts)
+ return TestUpdateResult(success=True, message="✅ Количество попыток обновлено")
+
+ async def remove_attempts(self, test_id: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, attempts=None)
+ return TestUpdateResult(success=True, message="✅ Ограничение попыток удалено")
+
+ async def update_time_limit(self, test_id: int, minutes: int) -> TestUpdateResult:
+ if minutes < 1:
+ return TestUpdateResult(success=False, message="❌ Лимит времени должен быть больше 0")
+ if minutes > 1440:
+ return TestUpdateResult(success=False, message="❌ Лимит времени не может быть больше 1440 минут (24 часа)")
+ await self._test_dao.update(test_id, time_limit=minutes * 60)
+ return TestUpdateResult(success=True, message="✅ Лимит времени обновлен")
+
+ async def remove_time_limit(self, test_id: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, time_limit=None)
+ return TestUpdateResult(success=True, message="✅ Лимит времени удален")
+
+ async def update_group(self, test_id: int, group: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, for_group=group)
+ return TestUpdateResult(success=True, message="✅ Группа обновлена")
+
+ async def remove_group(self, test_id: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, for_group=None)
+ return TestUpdateResult(success=True, message="✅ Тест теперь доступен для всех групп")
+
+ async def update_expires(self, test_id: int, expires_at: datetime) -> TestUpdateResult:
+ await self._test_dao.update(test_id, expires_at=expires_at)
+ return TestUpdateResult(success=True, message="✅ Срок действия обновлен")
+
+ async def remove_expires(self, test_id: int) -> TestUpdateResult:
+ await self._test_dao.update(test_id, expires_at=None)
+ return TestUpdateResult(success=True, message="✅ Срок действия удален")
diff --git a/src/quizzi/service/test_attempt.py b/src/quizzi/service/test_attempt.py
new file mode 100644
index 0000000..fa689df
--- /dev/null
+++ b/src/quizzi/service/test_attempt.py
@@ -0,0 +1,204 @@
+from dataclasses import dataclass
+from datetime import datetime
+
+from quizzi.domain.schemas import QuestionType
+from quizzi.infrastructure.database.dao.test import TestDAO
+from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
+from quizzi.infrastructure.database.repo.test import TestRepository
+from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
+from quizzi.infrastructure.utils.timezone import now_msk_naive
+
+
+@dataclass
+class AttemptStartResult:
+ success: bool
+ attempt_id: int | None = None
+ questions: list[int] | None = None
+ started_at: datetime | None = None
+ error: str = ""
+
+
+@dataclass
+class AnswerResult:
+ success: bool
+ is_correct: bool = False
+ error: str = ""
+
+
+@dataclass
+class TestResult:
+ score: int
+ correct_count: int
+ total_questions: int
+ is_passed: bool
+
+
+class TestAttemptService:
+ def __init__(
+ self,
+ test_dao: TestDAO,
+ test_repo: TestRepository,
+ attempt_repo: TestAttemptRepository,
+ answer_dao: UserAnswerDAO,
+ ) -> None:
+ self._test_dao = test_dao
+ self._test_repo = test_repo
+ self._attempt_repo = attempt_repo
+ self._answer_dao = answer_dao
+
+ async def start_attempt(self, user_id: int, test_id: int) -> AttemptStartResult:
+ active_attempt = await self._attempt_repo.get_active_attempt(user_id, test_id)
+ if active_attempt:
+ await self._attempt_repo.attempt_dao.delete(active_attempt.id)
+
+ _, questions = await self._test_repo.get_test_with_questions(test_id)
+ if not questions:
+ return AttemptStartResult(success=False, error="❌ В тесте нет вопросов")
+
+ attempt = await self._attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id)
+ started_at = now_msk_naive()
+
+ return AttemptStartResult(
+ success=True,
+ attempt_id=attempt.id,
+ questions=[q.id for q in questions],
+ started_at=started_at,
+ )
+
+ async def cancel_attempt(self, attempt_id: int) -> bool:
+ return await self._attempt_repo.attempt_dao.delete(attempt_id)
+
+ async def save_single_answer(
+ self,
+ attempt_id: int,
+ question_id: int,
+ selected_option_id: int,
+ ) -> AnswerResult:
+ question, options = await self._test_repo.get_question_with_options(question_id)
+ if not question:
+ return AnswerResult(success=False, error="❌ Вопрос не найден")
+
+ correct_options = [opt for opt in options if opt.is_correct]
+ is_correct = any(opt.id == selected_option_id for opt in correct_options)
+ selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "")
+
+ await self._answer_dao.create(
+ attempt_id=attempt_id,
+ question_id=question_id,
+ selected_option_id=selected_option_id,
+ text_answer=selected_text,
+ is_correct=is_correct,
+ )
+
+ return AnswerResult(success=True, is_correct=is_correct)
+
+ async def save_multiple_answer(
+ self,
+ attempt_id: int,
+ question_id: int,
+ selected_option_ids: list[int],
+ ) -> AnswerResult:
+ question, options = await self._test_repo.get_question_with_options(question_id)
+ if not question:
+ return AnswerResult(success=False, error="❌ Вопрос не найден")
+
+ selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids])
+ correct_texts = sorted([opt.text for opt in options if opt.is_correct])
+ is_correct = selected_texts == correct_texts
+
+ await self._answer_dao.create(
+ attempt_id=attempt_id,
+ question_id=question_id,
+ text_answer="|".join(selected_texts),
+ is_correct=is_correct,
+ )
+
+ return AnswerResult(success=True, is_correct=is_correct)
+
+ async def save_text_answer(
+ self,
+ attempt_id: int,
+ question_id: int,
+ text_answer: str,
+ ) -> AnswerResult:
+ question, options = await self._test_repo.get_question_with_options(question_id)
+ if not question:
+ return AnswerResult(success=False, error="❌ Вопрос не найден")
+
+ correct_options = [opt for opt in options if opt.is_correct]
+ user_normalized = text_answer.lower().replace(" ", "")
+ is_correct = any(opt.text.lower().replace(" ", "") == user_normalized for opt in correct_options)
+
+ await self._answer_dao.create(
+ attempt_id=attempt_id,
+ question_id=question_id,
+ text_answer=text_answer,
+ is_correct=is_correct,
+ )
+
+ return AnswerResult(success=True, is_correct=is_correct)
+
+ async def finish_attempt(self, attempt_id: int, total_questions: int) -> TestResult:
+ correct_count = await self._attempt_repo.calculate_attempt_score(attempt_id)
+ score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0
+ is_passed = score >= 50
+
+ await self._attempt_repo.finish_attempt(attempt_id, score, is_passed)
+
+ return TestResult(
+ score=score,
+ correct_count=correct_count,
+ total_questions=total_questions,
+ is_passed=is_passed,
+ )
+
+ async def finish_by_timeout(
+ self,
+ attempt_id: int,
+ questions: list[int],
+ user_answers: dict,
+ ) -> TestResult:
+ answered_question_ids = set()
+ answers = await self._attempt_repo.get_answers_for_attempt(attempt_id)
+ for answer in answers:
+ answered_question_ids.add(answer.question_id)
+
+ for question_id in questions:
+ if question_id in answered_question_ids:
+ continue
+
+ answer_data = user_answers.get(str(question_id))
+
+ if answer_data:
+ question, options = await self._test_repo.get_question_with_options(question_id)
+ if not question:
+ continue
+
+ if answer_data["type"] == "single":
+ await self.save_single_answer(attempt_id, question_id, answer_data["answer"])
+ elif answer_data["type"] == "multiple":
+ await self.save_multiple_answer(attempt_id, question_id, answer_data["answer"])
+ else:
+ await self._answer_dao.create(
+ attempt_id=attempt_id,
+ question_id=question_id,
+ text_answer=None,
+ is_correct=False,
+ )
+
+ return await self.finish_attempt(attempt_id, len(questions))
+
+ async def get_question_state(self, question_type: str):
+ from quizzi.application.bot.user_dialogs.states import UserTestSG
+
+ if question_type == QuestionType.SINGLE:
+ return UserTestSG.question_single
+ elif question_type == QuestionType.MULTIPLE:
+ return UserTestSG.question_multiple
+ else:
+ return UserTestSG.question_input
+
+ async def get_next_question_state(self, question_id: int):
+ question, _ = await self._test_repo.get_question_with_options(question_id)
+ question_type = question.question_type if question else QuestionType.SINGLE
+ return await self.get_question_state(question_type)
diff --git a/src/quizzi/service/user.py b/src/quizzi/service/user.py
new file mode 100644
index 0000000..ec25f1a
--- /dev/null
+++ b/src/quizzi/service/user.py
@@ -0,0 +1,75 @@
+from dataclasses import dataclass
+
+from quizzi.domain.schemas import User
+from quizzi.infrastructure.database.dao.group import GroupDAO
+from quizzi.infrastructure.database.dao.user import UserDAO
+
+
+@dataclass
+class RegistrationResult:
+ is_registered: bool
+ needs_name: bool = False
+ needs_group: bool = False
+ has_groups: bool = False
+ user: User | None = None
+
+
+class UserService:
+ def __init__(self, user_dao: UserDAO, group_dao: GroupDAO) -> None:
+ self._user_dao = user_dao
+ self._group_dao = group_dao
+
+ async def check_registration(self, user_id: int) -> RegistrationResult:
+ user = await self._user_dao.get_by_id(user_id)
+ groups = await self._group_dao.get_all()
+ has_groups = len(groups) > 0
+
+ if user is None:
+ return RegistrationResult(
+ is_registered=False,
+ needs_name=True,
+ needs_group=has_groups,
+ has_groups=has_groups,
+ )
+
+ needs_name = user.name is None
+ needs_group = has_groups and user.group is None
+
+ return RegistrationResult(
+ is_registered=not needs_name and not needs_group,
+ needs_name=needs_name,
+ needs_group=needs_group,
+ has_groups=has_groups,
+ user=user,
+ )
+
+ async def create_user(
+ self,
+ user_id: int,
+ first_name: str,
+ username: str | None = None,
+ last_name: str | None = None,
+ ) -> User:
+ return await self._user_dao.create(
+ user_id=user_id,
+ first_name=first_name,
+ username=username,
+ last_name=last_name,
+ )
+
+ async def update_user_info(
+ self,
+ user_id: int,
+ first_name: str,
+ username: str | None = None,
+ last_name: str | None = None,
+ ) -> User | None:
+ return await self._user_dao.upsert(
+ user_id=user_id,
+ first_name=first_name,
+ username=username,
+ last_name=last_name,
+ )
+
+ async def get_all_users(self) -> list[User]:
+ return await self._user_dao.get_all()