mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 10:25:28 +03:00
@@ -1,51 +1,39 @@
|
|||||||
# Trudex
|
# 📚 Quizzi
|
||||||
|
|
||||||
Telegram платформа для тестирования по охране труда
|
Telegram-бот для создания и прохождения тестов с удобным интерфейсом и полным контролем над процессом.
|
||||||
|
|
||||||
## Архитектура
|
## 🎯 Зачем это нужно
|
||||||
|
|
||||||
Проект построен на принципах Clean Architecture с разделением на три слоя:
|
Если вам надоело собирать ответы в Google Forms, вручную считать баллы и рассылать результаты — Quizzi решает эти проблемы. Бот работает прямо в Telegram: студенты проходят тесты там, где им удобно, а преподаватели получают готовую статистику в Excel.
|
||||||
|
|
||||||
- **Application** - координация и UI логика (aiogram, aiogram-dialog)
|
## ✨ Что умеет бот
|
||||||
- **Domain** - бизнес-логика и доменные модели
|
|
||||||
- **Infrastructure** - технические детали (БД, API, планировщик)
|
|
||||||
|
|
||||||
## Технологический стек
|
### Для студентов
|
||||||
|
|
||||||
- aiogram 3.x - Telegram Bot API
|
Переходите по ссылке или QR-коду — и сразу попадаете в тест. Никаких регистраций и лишних шагов. Отвечаете на вопросы кнопками, видите таймер с оставшимся временем, получаете результат мгновенно после завершения. Если разрешено — можете пересдать и улучшить балл.
|
||||||
- aiogram-dialog - state machine для диалогов
|
|
||||||
- dishka - Dependency Injection
|
|
||||||
- SQLAlchemy 2.x async - ORM для PostgreSQL
|
|
||||||
- Alembic - миграции БД
|
|
||||||
- APScheduler - фоновые задачи
|
|
||||||
- httpx - HTTP-клиент
|
|
||||||
- Pydantic - валидация конфигурации
|
|
||||||
|
|
||||||
## Структура проекта
|
### Для преподавателей
|
||||||
|
|
||||||
```
|
Создавайте тесты с разными типами вопросов: один правильный ответ, несколько или свободный ввод. Прикрепляйте изображения к вопросам. Настраивайте под себя:
|
||||||
src/trudex/
|
|
||||||
├── application/ # Слой приложения
|
- ⏱️ Лимит времени на весь тест
|
||||||
│ └── bot/
|
- � К*оличество попыток пересдачи
|
||||||
│ ├── middlewares/ # Промежуточные обработчики
|
- 📅 Дату автоматической деактивации
|
||||||
│ ├── user_dialogs/ # Диалоги пользователей
|
- � Пароль для доступа
|
||||||
│ ├── admin_dialogs/# Диалоги администраторов
|
- 🎓 Привязку к конкретной группе
|
||||||
│ └── handlers.py # Обработчики событий
|
|
||||||
├── domain/ # Доменный слой
|
Делитесь тестом через ссылку или QR-код. Смотрите кто прошёл, с каким результатом, какие ответы давал. Выгружайте статистику по группе в Excel — с оценками, процентом сдачи и средним баллом.
|
||||||
│ └── schemas.py # Доменные модели
|
|
||||||
└── infrastructure/ # Инфраструктурный слой
|
## � Установка
|
||||||
├── api/ # Внешние API
|
|
||||||
├── database/ # Работа с БД
|
```bash
|
||||||
│ ├── dao/ # Data Access Objects
|
uv sync
|
||||||
│ ├── models.py # ORM модели
|
cp config.example.toml config.toml
|
||||||
│ └── config.py # Конфигурация БД
|
# отредактируйте config.toml
|
||||||
├── scheduling/ # Фоновые задачи
|
alembic upgrade head
|
||||||
└── utils/ # Утилиты и конфигурация
|
python -m quizzi.application
|
||||||
```
|
```
|
||||||
|
|
||||||
## Запуск
|
## 📄 Лицензия
|
||||||
|
|
||||||
1. Установить зависимости: `uv sync`
|
MIT
|
||||||
2. Настроить `config.toml`
|
|
||||||
3. Запустить миграции: `alembic upgrade head`
|
|
||||||
4. Запустить бота: `python -m trudex`
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ from quizzi.application.bot.user_dialogs.main_menu import user_menu_dialog
|
|||||||
from quizzi.application.bot.user_dialogs.registration import registration_dialog
|
from quizzi.application.bot.user_dialogs.registration import registration_dialog
|
||||||
from quizzi.application.bot.user_dialogs.take_test import take_test_dialog
|
from quizzi.application.bot.user_dialogs.take_test import take_test_dialog
|
||||||
from quizzi.infrastructure.database.repo.user import UserRepository
|
from quizzi.infrastructure.database.repo.user import UserRepository
|
||||||
from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider
|
from quizzi.infrastructure.di import DatabaseProvider, SchedulerProvider, ServiceProvider
|
||||||
from quizzi.infrastructure.utils.bot_commands import setup_bot_commands
|
from quizzi.infrastructure.utils.bot_commands import setup_bot_commands
|
||||||
from quizzi.infrastructure.utils.config import Config
|
from quizzi.infrastructure.utils.config import Config
|
||||||
|
|
||||||
@@ -68,6 +68,7 @@ async def main() -> None:
|
|||||||
|
|
||||||
container = make_async_container(
|
container = make_async_container(
|
||||||
DatabaseProvider(),
|
DatabaseProvider(),
|
||||||
|
ServiceProvider(),
|
||||||
SchedulerProvider(),
|
SchedulerProvider(),
|
||||||
context={Bot: bot, Config: config}
|
context={Bot: bot, Config: config}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,36 +10,29 @@ from dishka.integrations.aiogram import FromDishka
|
|||||||
from quizzi.application.bot.admin_dialogs.states import AdminMenuSG
|
from quizzi.application.bot.admin_dialogs.states import AdminMenuSG
|
||||||
from quizzi.application.bot.creator_dialogs.states import CreatorMenuSG
|
from quizzi.application.bot.creator_dialogs.states import CreatorMenuSG
|
||||||
from quizzi.application.bot.user_dialogs.states import UserDeeplinkSG, UserMenuSG, UserRegistrationSG
|
from quizzi.application.bot.user_dialogs.states import UserDeeplinkSG, UserMenuSG, UserRegistrationSG
|
||||||
from quizzi.infrastructure.database.dao.group import GroupDAO
|
from quizzi.service.test import TestService
|
||||||
from quizzi.infrastructure.database.dao.test import TestDAO
|
from quizzi.service.user import UserService
|
||||||
from quizzi.infrastructure.database.dao.user import UserDAO
|
|
||||||
from quizzi.infrastructure.utils.config import Config
|
|
||||||
from quizzi.infrastructure.utils.test_id_to_hash import decode_id
|
|
||||||
from quizzi.infrastructure.utils.timezone import now_msk_naive
|
|
||||||
|
|
||||||
router = Router()
|
router = Router()
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def ensure_user_registered(
|
async def ensure_user_registered(
|
||||||
user_dao: UserDAO,
|
user_service: UserService,
|
||||||
group_dao: GroupDAO,
|
|
||||||
message: Message,
|
message: Message,
|
||||||
dialog_manager: DialogManager,
|
dialog_manager: DialogManager,
|
||||||
pending_test_id: int | None = None,
|
pending_test_id: int | None = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
assert message.from_user is not None
|
assert message.from_user is not None
|
||||||
|
|
||||||
existing_user = await user_dao.get_by_id(message.from_user.id)
|
result = await user_service.check_registration(message.from_user.id)
|
||||||
groups = await group_dao.get_all()
|
|
||||||
has_groups = len(groups) > 0
|
|
||||||
|
|
||||||
start_data = {"user_id": message.from_user.id, "has_groups": has_groups}
|
start_data = {"user_id": message.from_user.id, "has_groups": result.has_groups}
|
||||||
if pending_test_id:
|
if pending_test_id:
|
||||||
start_data["pending_test_id"] = pending_test_id
|
start_data["pending_test_id"] = pending_test_id
|
||||||
|
|
||||||
if existing_user is None:
|
if result.user is None:
|
||||||
await user_dao.create(
|
await user_service.create_user(
|
||||||
user_id=message.from_user.id,
|
user_id=message.from_user.id,
|
||||||
first_name=message.from_user.first_name,
|
first_name=message.from_user.first_name,
|
||||||
username=message.from_user.username,
|
username=message.from_user.username,
|
||||||
@@ -52,10 +45,7 @@ async def ensure_user_registered(
|
|||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
needs_name = existing_user.name is None
|
if result.needs_name:
|
||||||
needs_group = has_groups and existing_user.group is None
|
|
||||||
|
|
||||||
if needs_name:
|
|
||||||
await dialog_manager.start(
|
await dialog_manager.start(
|
||||||
UserRegistrationSG.input_name,
|
UserRegistrationSG.input_name,
|
||||||
mode=StartMode.RESET_STACK,
|
mode=StartMode.RESET_STACK,
|
||||||
@@ -63,7 +53,7 @@ async def ensure_user_registered(
|
|||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if needs_group:
|
if result.needs_group:
|
||||||
await dialog_manager.start(
|
await dialog_manager.start(
|
||||||
UserRegistrationSG.select_group,
|
UserRegistrationSG.select_group,
|
||||||
mode=StartMode.RESET_STACK,
|
mode=StartMode.RESET_STACK,
|
||||||
@@ -71,7 +61,7 @@ async def ensure_user_registered(
|
|||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
await user_dao.upsert(
|
await user_service.update_user_info(
|
||||||
user_id=message.from_user.id,
|
user_id=message.from_user.id,
|
||||||
first_name=message.from_user.first_name,
|
first_name=message.from_user.first_name,
|
||||||
username=message.from_user.username,
|
username=message.from_user.username,
|
||||||
@@ -80,39 +70,13 @@ async def ensure_user_registered(
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def validate_deeplink_test(
|
|
||||||
test_dao: TestDAO,
|
|
||||||
user_dao: UserDAO,
|
|
||||||
test_id: int,
|
|
||||||
user_id: int,
|
|
||||||
) -> tuple[bool, str]:
|
|
||||||
test = await test_dao.get_by_id(test_id)
|
|
||||||
|
|
||||||
if not test:
|
|
||||||
return False, "❌ Тест не найден"
|
|
||||||
|
|
||||||
if not test.is_active:
|
|
||||||
return False, "❌ Тест деактивирован"
|
|
||||||
|
|
||||||
if test.expires_at and test.expires_at < now_msk_naive():
|
|
||||||
return False, "❌ Срок действия теста истек"
|
|
||||||
|
|
||||||
user = await user_dao.get_by_id(user_id)
|
|
||||||
if test.for_group and user and user.group != test.for_group:
|
|
||||||
return False, f"❌ Тест доступен только для группы {test.for_group}"
|
|
||||||
|
|
||||||
return True, ""
|
|
||||||
|
|
||||||
|
|
||||||
@router.message(CommandStart(deep_link=True))
|
@router.message(CommandStart(deep_link=True))
|
||||||
async def start_with_deeplink(
|
async def start_with_deeplink(
|
||||||
message: Message,
|
message: Message,
|
||||||
command: CommandObject,
|
command: CommandObject,
|
||||||
dialog_manager: DialogManager,
|
dialog_manager: DialogManager,
|
||||||
user_dao: FromDishka[UserDAO],
|
user_service: FromDishka[UserService],
|
||||||
group_dao: FromDishka[GroupDAO],
|
test_service: FromDishka[TestService],
|
||||||
test_dao: FromDishka[TestDAO],
|
|
||||||
config: FromDishka[Config],
|
|
||||||
) -> None:
|
) -> None:
|
||||||
assert message.from_user is not None
|
assert message.from_user is not None
|
||||||
|
|
||||||
@@ -125,39 +89,36 @@ async def start_with_deeplink(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not deeplink:
|
if not deeplink:
|
||||||
await start_handler(message, user_dao, group_dao, dialog_manager)
|
await start_handler(message, user_service, dialog_manager)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
test_id = test_service.decode_test_hash(deeplink)
|
||||||
test_id = decode_id(deeplink, config.security.encode_key)
|
if test_id is None:
|
||||||
except (ValueError, IndexError):
|
|
||||||
logger.warning("Invalid deeplink: user_id=%d, deeplink=%s", message.from_user.id, deeplink)
|
logger.warning("Invalid deeplink: user_id=%d, deeplink=%s", message.from_user.id, deeplink)
|
||||||
await message.answer("❌ Неверная ссылка на тест")
|
await message.answer("❌ Неверная ссылка на тест")
|
||||||
await start_handler(message, user_dao, group_dao, dialog_manager)
|
await start_handler(message, user_service, dialog_manager)
|
||||||
return
|
return
|
||||||
|
|
||||||
is_registered = await ensure_user_registered(
|
is_registered = await ensure_user_registered(
|
||||||
user_dao, group_dao, message, dialog_manager, pending_test_id=test_id
|
user_service, message, dialog_manager, pending_test_id=test_id
|
||||||
)
|
)
|
||||||
|
|
||||||
if not is_registered:
|
if not is_registered:
|
||||||
return
|
return
|
||||||
|
|
||||||
is_valid, error = await validate_deeplink_test(
|
validation = await test_service.validate_test(test_id, message.from_user.id)
|
||||||
test_dao, user_dao, test_id, message.from_user.id
|
|
||||||
)
|
|
||||||
|
|
||||||
if not is_valid:
|
if not validation.is_valid:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Test validation failed: user_id=%d, test_id=%d, error=%s",
|
"Test validation failed: user_id=%d, test_id=%d, error=%s",
|
||||||
message.from_user.id,
|
message.from_user.id,
|
||||||
test_id,
|
test_id,
|
||||||
error,
|
validation.error,
|
||||||
)
|
)
|
||||||
await dialog_manager.start(
|
await dialog_manager.start(
|
||||||
UserDeeplinkSG.test_preview,
|
UserDeeplinkSG.test_preview,
|
||||||
mode=StartMode.RESET_STACK,
|
mode=StartMode.RESET_STACK,
|
||||||
data={"test_id": test_id, "error": error}
|
data={"test_id": test_id, "error": validation.error}
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -173,8 +134,7 @@ async def start_with_deeplink(
|
|||||||
async def start_handler(
|
async def start_handler(
|
||||||
message: Message,
|
message: Message,
|
||||||
dialog_manager: DialogManager,
|
dialog_manager: DialogManager,
|
||||||
user_dao: FromDishka[UserDAO],
|
user_service: FromDishka[UserService],
|
||||||
group_dao: FromDishka[GroupDAO],
|
|
||||||
) -> None:
|
) -> None:
|
||||||
assert message.from_user is not None
|
assert message.from_user is not None
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -184,7 +144,7 @@ async def start_handler(
|
|||||||
)
|
)
|
||||||
|
|
||||||
is_registered = await ensure_user_registered(
|
is_registered = await ensure_user_registered(
|
||||||
user_dao, group_dao, message, dialog_manager
|
user_service, message, dialog_manager
|
||||||
)
|
)
|
||||||
|
|
||||||
if is_registered:
|
if is_registered:
|
||||||
|
|||||||
@@ -7,8 +7,7 @@ from dishka import FromDishka
|
|||||||
from dishka.integrations.aiogram_dialog import inject
|
from dishka.integrations.aiogram_dialog import inject
|
||||||
|
|
||||||
from quizzi.application.bot.shared_dialogs.states import SharedBroadcastSG
|
from quizzi.application.bot.shared_dialogs.states import SharedBroadcastSG
|
||||||
from quizzi.infrastructure.database.dao.user import UserDAO
|
from quizzi.service.broadcast import BroadcastService
|
||||||
from quizzi.infrastructure.utils.broadcast import broadcast_message
|
|
||||||
|
|
||||||
|
|
||||||
async def on_broadcast_input(message: Message, _widget: MessageInput, manager: DialogManager):
|
async def on_broadcast_input(message: Message, _widget: MessageInput, manager: DialogManager):
|
||||||
@@ -18,7 +17,12 @@ async def on_broadcast_input(message: Message, _widget: MessageInput, manager: D
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manager: DialogManager, user_dao: FromDishka[UserDAO]):
|
async def on_broadcast_confirm(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
broadcast_service: FromDishka[BroadcastService],
|
||||||
|
):
|
||||||
message_id = manager.dialog_data.get("broadcast_message_id")
|
message_id = manager.dialog_data.get("broadcast_message_id")
|
||||||
chat_id = manager.dialog_data.get("broadcast_chat_id")
|
chat_id = manager.dialog_data.get("broadcast_chat_id")
|
||||||
|
|
||||||
@@ -33,7 +37,7 @@ async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manage
|
|||||||
await _callback.answer("Ошибка: бот не найден")
|
await _callback.answer("Ошибка: бот не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
stats = await broadcast_message(bot, message_id, chat_id, user_dao)
|
stats = await broadcast_service.broadcast_message(bot, message_id, chat_id)
|
||||||
|
|
||||||
stats_text = (
|
stats_text = (
|
||||||
f"✅ <b>Рассылка завершена</b>\n\n"
|
f"✅ <b>Рассылка завершена</b>\n\n"
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import functools
|
import functools
|
||||||
import io
|
|
||||||
from datetime import date, datetime, time
|
from datetime import date, datetime, time
|
||||||
|
|
||||||
from aiogram import Bot
|
from aiogram import Bot
|
||||||
@@ -11,18 +10,16 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG
|
|||||||
from aiogram_dialog.widgets.text import Const, Format
|
from aiogram_dialog.widgets.text import Const, Format
|
||||||
from dishka import FromDishka
|
from dishka import FromDishka
|
||||||
from dishka.integrations.aiogram_dialog import inject
|
from dishka.integrations.aiogram_dialog import inject
|
||||||
from openpyxl import Workbook
|
|
||||||
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
|
||||||
|
|
||||||
from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG
|
from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG
|
||||||
from quizzi.infrastructure.database.dao.group import GroupDAO
|
from quizzi.infrastructure.database.dao.group import GroupDAO
|
||||||
from quizzi.infrastructure.database.dao.test import TestDAO
|
from quizzi.infrastructure.database.dao.test import TestDAO
|
||||||
from quizzi.infrastructure.database.repo.test import TestRepository
|
from quizzi.infrastructure.database.repo.test import TestRepository
|
||||||
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
from quizzi.infrastructure.utils.config import Config
|
|
||||||
from quizzi.infrastructure.utils.qr_generator import generate_qr_bytes
|
from quizzi.infrastructure.utils.qr_generator import generate_qr_bytes
|
||||||
from quizzi.infrastructure.utils.test_id_to_hash import encode_id
|
|
||||||
from quizzi.infrastructure.utils.timezone import to_msk
|
from quizzi.infrastructure.utils.timezone import to_msk
|
||||||
|
from quizzi.service.excel import ExcelService
|
||||||
|
from quizzi.service.test import TestService
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
@@ -101,34 +98,38 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_toggle_active(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_toggle_active(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
test = await test_dao.get_by_id(test_id)
|
result = await test_service.toggle_test_active(test_id)
|
||||||
|
await _callback.answer(result.message)
|
||||||
if test:
|
if result.success:
|
||||||
await test_dao.update(test_id, is_active=not test.is_active)
|
|
||||||
action = "деактивирован" if test.is_active else "активирован"
|
|
||||||
await _callback.answer(f"✅ Тест {action}")
|
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_toggle_results_viewable(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_toggle_results_viewable(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
test = await test_dao.get_by_id(test_id)
|
result = await test_service.toggle_results_viewable(test_id)
|
||||||
|
await _callback.answer(result.message)
|
||||||
if test:
|
if result.success:
|
||||||
await test_dao.update(test_id, are_results_viewable=not test.are_results_viewable)
|
|
||||||
action = "скрыты" if test.are_results_viewable else "видны"
|
|
||||||
await _callback.answer(f"✅ Результаты теперь {action}")
|
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@@ -235,7 +236,13 @@ async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: Di
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager, config: FromDishka[Config], bot_inst: FromDishka[Bot]):
|
async def on_share_test(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
bot_inst: FromDishka[Bot],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
|
|
||||||
if not test_id:
|
if not test_id:
|
||||||
@@ -243,11 +250,7 @@ async def on_share_test(_callback: CallbackQuery, _button: Button, manager: Dial
|
|||||||
"share_link": "Ошибка: тест не найден"
|
"share_link": "Ошибка: тест не найден"
|
||||||
}
|
}
|
||||||
|
|
||||||
test_hash = encode_id(
|
test_hash = test_service.encode_test_id(test_id)
|
||||||
test_id,
|
|
||||||
config.security.encode_key,
|
|
||||||
config.security.encoded_string_length
|
|
||||||
)
|
|
||||||
|
|
||||||
bot_info = await bot_inst.get_me()
|
bot_info = await bot_inst.get_me()
|
||||||
bot_username = bot_info.username or "your_bot"
|
bot_username = bot_info.username or "your_bot"
|
||||||
@@ -314,13 +317,18 @@ async def get_delete_confirm_data(dialog_manager: DialogManager, test_dao: FromD
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_confirm_delete(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
deleted = await test_dao.delete(test_id)
|
deleted = await test_service.delete_test(test_id)
|
||||||
if deleted:
|
if deleted:
|
||||||
await _callback.answer("✅ Тест удалён")
|
await _callback.answer("✅ Тест удалён")
|
||||||
await manager.switch_to(SharedTestsSG.tests_list)
|
await manager.switch_to(SharedTestsSG.tests_list)
|
||||||
@@ -329,7 +337,12 @@ async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager:
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_password_input(
|
||||||
|
message: Message,
|
||||||
|
_widget: MessageInput,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await message.answer("❌ Тест не найден")
|
await message.answer("❌ Тест не найден")
|
||||||
@@ -339,30 +352,36 @@ async def on_password_input(message: Message, _widget: MessageInput, manager: Di
|
|||||||
await message.answer("❌ Пароль не может быть пустым")
|
await message.answer("❌ Пароль не может быть пустым")
|
||||||
return
|
return
|
||||||
|
|
||||||
password = message.text.strip()
|
result = await test_service.update_password(test_id, message.text.strip())
|
||||||
if len(password) > 255:
|
await message.answer(result.message)
|
||||||
await message.answer("❌ Пароль слишком длинный (максимум 255 символов)")
|
if result.success:
|
||||||
return
|
|
||||||
|
|
||||||
await test_dao.update(test_id, password=password)
|
|
||||||
await message.answer("✅ Пароль обновлен")
|
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_remove_password(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_remove_password(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, password=None)
|
result = await test_service.remove_password(test_id)
|
||||||
await _callback.answer("✅ Пароль удален")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_attempts_input_edit(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_attempts_input_edit(
|
||||||
|
message: Message,
|
||||||
|
_widget: MessageInput,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await message.answer("❌ Тест не найден")
|
await message.answer("❌ Тест не найден")
|
||||||
@@ -373,40 +392,40 @@ async def on_attempts_input_edit(message: Message, _widget: MessageInput, manage
|
|||||||
return
|
return
|
||||||
|
|
||||||
attempts_str = message.text.strip()
|
attempts_str = message.text.strip()
|
||||||
|
|
||||||
if not attempts_str.isdigit():
|
if not attempts_str.isdigit():
|
||||||
await message.answer("❌ Количество попыток должно быть числом")
|
await message.answer("❌ Количество попыток должно быть числом")
|
||||||
return
|
return
|
||||||
|
|
||||||
attempts = int(attempts_str)
|
result = await test_service.update_attempts(test_id, int(attempts_str))
|
||||||
|
await message.answer(result.message)
|
||||||
if attempts < 1:
|
if result.success:
|
||||||
await message.answer("❌ Количество попыток должно быть больше 0")
|
|
||||||
return
|
|
||||||
|
|
||||||
if attempts > 100:
|
|
||||||
await message.answer("❌ Количество попыток не может быть больше 100")
|
|
||||||
return
|
|
||||||
|
|
||||||
await test_dao.update(test_id, attempts=attempts)
|
|
||||||
await message.answer("✅ Количество попыток обновлено")
|
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_remove_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_remove_attempts(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, attempts=None)
|
result = await test_service.remove_attempts(test_id)
|
||||||
await _callback.answer("✅ Ограничение попыток удалено")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_time_limit_input(
|
||||||
|
message: Message,
|
||||||
|
_widget: MessageInput,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await message.answer("❌ Тест не найден")
|
await message.answer("❌ Тест не найден")
|
||||||
@@ -417,36 +436,30 @@ async def on_time_limit_input(message: Message, _widget: MessageInput, manager:
|
|||||||
return
|
return
|
||||||
|
|
||||||
time_limit_str = message.text.strip()
|
time_limit_str = message.text.strip()
|
||||||
|
|
||||||
if not time_limit_str.isdigit():
|
if not time_limit_str.isdigit():
|
||||||
await message.answer("❌ Лимит времени должен быть числом (в минутах)")
|
await message.answer("❌ Лимит времени должен быть числом (в минутах)")
|
||||||
return
|
return
|
||||||
|
|
||||||
time_limit_minutes = int(time_limit_str)
|
result = await test_service.update_time_limit(test_id, int(time_limit_str))
|
||||||
|
await message.answer(result.message)
|
||||||
if time_limit_minutes < 1:
|
if result.success:
|
||||||
await message.answer("❌ Лимит времени должен быть больше 0")
|
|
||||||
return
|
|
||||||
|
|
||||||
if time_limit_minutes > 1440:
|
|
||||||
await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)")
|
|
||||||
return
|
|
||||||
|
|
||||||
time_limit_seconds = time_limit_minutes * 60
|
|
||||||
await test_dao.update(test_id, time_limit=time_limit_seconds)
|
|
||||||
await message.answer("✅ Лимит времени обновлен")
|
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_remove_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_remove_time_limit(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, time_limit=None)
|
result = await test_service.remove_time_limit(test_id)
|
||||||
await _callback.answer("✅ Лимит времени удален")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@@ -460,51 +473,73 @@ async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDish
|
|||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_group_selected_for_test(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str, test_dao: FromDishka[TestDAO]):
|
async def on_group_selected_for_test(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_widget,
|
||||||
|
manager: DialogManager,
|
||||||
|
item_id: str,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, for_group=int(item_id))
|
result = await test_service.update_group(test_id, int(item_id))
|
||||||
await _callback.answer("✅ Группа обновлена")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_remove_group(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_remove_group(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, for_group=None)
|
result = await test_service.remove_group(test_id)
|
||||||
await _callback.answer("✅ Тест теперь доступен для всех групп")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_date_selected_for_test(_callback, _widget, manager: DialogManager, selected_date: date, test_dao: FromDishka[TestDAO]):
|
async def on_date_selected_for_test(
|
||||||
|
_callback,
|
||||||
|
_widget,
|
||||||
|
manager: DialogManager,
|
||||||
|
selected_date: date,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
expires_at = datetime.combine(selected_date, time.min)
|
expires_at = datetime.combine(selected_date, time.min)
|
||||||
await test_dao.update(test_id, expires_at=expires_at)
|
result = await test_service.update_expires(test_id, expires_at)
|
||||||
await _callback.answer("✅ Срок действия обновлен")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_remove_expires(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
|
async def on_remove_expires(
|
||||||
|
_callback: CallbackQuery,
|
||||||
|
_button: Button,
|
||||||
|
manager: DialogManager,
|
||||||
|
test_service: FromDishka[TestService],
|
||||||
|
):
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
await _callback.answer("❌ Тест не найден")
|
await _callback.answer("❌ Тест не найден")
|
||||||
return
|
return
|
||||||
|
|
||||||
await test_dao.update(test_id, expires_at=None)
|
result = await test_service.remove_expires(test_id)
|
||||||
await _callback.answer("✅ Срок действия удален")
|
await _callback.answer(result.message)
|
||||||
await manager.switch_to(SharedTestsSG.test_detail)
|
await manager.switch_to(SharedTestsSG.test_detail)
|
||||||
|
|
||||||
|
|
||||||
@@ -525,115 +560,13 @@ async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDi
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def create_excel_report(
|
|
||||||
test_title: str,
|
|
||||||
group_number: int,
|
|
||||||
stats: list[tuple[str, int | None, datetime | None, bool | None]],
|
|
||||||
) -> bytes:
|
|
||||||
wb = Workbook()
|
|
||||||
ws = wb.active
|
|
||||||
assert ws is not None
|
|
||||||
ws.title = "Статистика"
|
|
||||||
|
|
||||||
header_font = Font(bold=True, color="FFFFFF")
|
|
||||||
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
|
||||||
header_alignment = Alignment(horizontal="center", vertical="center")
|
|
||||||
thin_border = Border(
|
|
||||||
left=Side(style="thin"),
|
|
||||||
right=Side(style="thin"),
|
|
||||||
top=Side(style="thin"),
|
|
||||||
bottom=Side(style="thin"),
|
|
||||||
)
|
|
||||||
|
|
||||||
ws.merge_cells("A1:E1")
|
|
||||||
ws["A1"] = f"Тест: {test_title}"
|
|
||||||
ws["A1"].font = Font(bold=True, size=14)
|
|
||||||
ws["A1"].alignment = Alignment(horizontal="center")
|
|
||||||
|
|
||||||
ws.merge_cells("A2:E2")
|
|
||||||
ws["A2"] = f"Группа: {group_number}"
|
|
||||||
ws["A2"].font = Font(bold=True, size=12)
|
|
||||||
ws["A2"].alignment = Alignment(horizontal="center")
|
|
||||||
|
|
||||||
headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
|
|
||||||
for col, header in enumerate(headers, 1):
|
|
||||||
cell = ws.cell(row=4, column=col, value=header)
|
|
||||||
cell.font = header_font
|
|
||||||
cell.fill = header_fill
|
|
||||||
cell.alignment = header_alignment
|
|
||||||
cell.border = thin_border
|
|
||||||
|
|
||||||
passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
|
||||||
failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
|
||||||
not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
|
|
||||||
|
|
||||||
grades: list[int] = []
|
|
||||||
|
|
||||||
for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
|
|
||||||
ws.cell(row=row_idx, column=1, value=name).border = thin_border
|
|
||||||
|
|
||||||
if score is not None:
|
|
||||||
ws.cell(row=row_idx, column=2, value=score).border = thin_border
|
|
||||||
|
|
||||||
grade = score // 10
|
|
||||||
grades.append(grade)
|
|
||||||
ws.cell(row=row_idx, column=3, value=grade).border = thin_border
|
|
||||||
|
|
||||||
finished_msk = to_msk(finished_at) if finished_at else None
|
|
||||||
date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—"
|
|
||||||
ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
|
|
||||||
status = "Пройден" if is_passed else "Не пройден"
|
|
||||||
status_cell = ws.cell(row=row_idx, column=5, value=status)
|
|
||||||
status_cell.border = thin_border
|
|
||||||
|
|
||||||
for col in range(1, 6):
|
|
||||||
ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
|
|
||||||
else:
|
|
||||||
ws.cell(row=row_idx, column=2, value="—").border = thin_border
|
|
||||||
ws.cell(row=row_idx, column=3, value="—").border = thin_border
|
|
||||||
ws.cell(row=row_idx, column=4, value="—").border = thin_border
|
|
||||||
status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
|
|
||||||
status_cell.border = thin_border
|
|
||||||
|
|
||||||
for col in range(1, 6):
|
|
||||||
ws.cell(row=row_idx, column=col).fill = not_passed_fill
|
|
||||||
|
|
||||||
ws.column_dimensions["A"].width = 30
|
|
||||||
ws.column_dimensions["B"].width = 15
|
|
||||||
ws.column_dimensions["C"].width = 10
|
|
||||||
ws.column_dimensions["D"].width = 20
|
|
||||||
ws.column_dimensions["E"].width = 15
|
|
||||||
|
|
||||||
total_users = len(stats)
|
|
||||||
passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
|
|
||||||
attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
|
|
||||||
|
|
||||||
summary_row = len(stats) + 6
|
|
||||||
ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
|
|
||||||
ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
|
|
||||||
ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
|
|
||||||
ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
|
|
||||||
if attempted_users > 0:
|
|
||||||
success_rate = round(passed_users / attempted_users * 100)
|
|
||||||
ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
|
|
||||||
if grades:
|
|
||||||
avg_grade = round(sum(grades) / len(grades), 1)
|
|
||||||
ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
|
|
||||||
|
|
||||||
output = io.BytesIO()
|
|
||||||
wb.save(output)
|
|
||||||
output.seek(0)
|
|
||||||
return output.read()
|
|
||||||
|
|
||||||
|
|
||||||
@inject
|
@inject
|
||||||
async def on_group_selected_for_export(
|
async def on_group_selected_for_export(
|
||||||
_callback: CallbackQuery,
|
_callback: CallbackQuery,
|
||||||
_widget: Select,
|
_widget: Select,
|
||||||
manager: DialogManager,
|
manager: DialogManager,
|
||||||
item_id: str,
|
item_id: str,
|
||||||
test_dao: FromDishka[TestDAO],
|
excel_service: FromDishka[ExcelService],
|
||||||
attempt_repo: FromDishka[TestAttemptRepository],
|
|
||||||
) -> None:
|
) -> None:
|
||||||
test_id = manager.dialog_data.get("selected_test_id")
|
test_id = manager.dialog_data.get("selected_test_id")
|
||||||
if not test_id:
|
if not test_id:
|
||||||
@@ -643,26 +576,16 @@ async def on_group_selected_for_export(
|
|||||||
assert _callback.message is not None
|
assert _callback.message is not None
|
||||||
await _callback.answer("⏳ Формирую отчёт...")
|
await _callback.answer("⏳ Формирую отчёт...")
|
||||||
|
|
||||||
test = await test_dao.get_by_id(test_id)
|
|
||||||
if not test:
|
|
||||||
await _callback.message.answer("❌ Тест не найден")
|
|
||||||
return
|
|
||||||
|
|
||||||
group_number = int(item_id)
|
group_number = int(item_id)
|
||||||
stats = await attempt_repo.get_group_test_statistics(test_id, group_number)
|
result = await excel_service.generate_group_report(test_id, group_number)
|
||||||
|
|
||||||
if not stats:
|
if not result.success or not result.data or not result.filename:
|
||||||
await _callback.message.answer(f"❌ В группе {group_number} нет студентов")
|
await _callback.message.answer(result.caption)
|
||||||
return
|
return
|
||||||
|
|
||||||
excel_bytes = create_excel_report(test.title, group_number, stats)
|
|
||||||
|
|
||||||
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
|
|
||||||
filename = f"{safe_title}_group_{group_number}.xlsx"
|
|
||||||
|
|
||||||
await _callback.message.answer_document(
|
await _callback.message.answer_document(
|
||||||
document=BufferedInputFile(excel_bytes, filename=filename),
|
document=BufferedInputFile(result.data, filename=result.filename),
|
||||||
caption=f"📊 <b>Статистика по тесту</b>\n\n📝 {test.title}\n🎓 Группа {group_number}",
|
caption=result.caption,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,11 @@ from quizzi.infrastructure.database.repo.user import UserRepository
|
|||||||
from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications
|
from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications
|
||||||
from quizzi.infrastructure.utils.config import Config
|
from quizzi.infrastructure.utils.config import Config
|
||||||
from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter
|
from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter
|
||||||
|
from quizzi.service.broadcast import BroadcastService
|
||||||
|
from quizzi.service.excel import ExcelService
|
||||||
|
from quizzi.service.test import TestService
|
||||||
|
from quizzi.service.test_attempt import TestAttemptService
|
||||||
|
from quizzi.service.user import UserService
|
||||||
|
|
||||||
|
|
||||||
class DatabaseProvider(Provider):
|
class DatabaseProvider(Provider):
|
||||||
@@ -80,6 +85,41 @@ class DatabaseProvider(Provider):
|
|||||||
return TestAttemptRepository(session)
|
return TestAttemptRepository(session)
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceProvider(Provider):
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_user_service(self, user_dao: UserDAO, group_dao: GroupDAO) -> UserService:
|
||||||
|
return UserService(user_dao, group_dao)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_service(
|
||||||
|
self,
|
||||||
|
test_dao: TestDAO,
|
||||||
|
test_repo: TestRepository,
|
||||||
|
attempt_repo: TestAttemptRepository,
|
||||||
|
user_dao: UserDAO,
|
||||||
|
config: Config,
|
||||||
|
) -> TestService:
|
||||||
|
return TestService(test_dao, test_repo, attempt_repo, user_dao, config)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_test_attempt_service(
|
||||||
|
self,
|
||||||
|
test_dao: TestDAO,
|
||||||
|
test_repo: TestRepository,
|
||||||
|
attempt_repo: TestAttemptRepository,
|
||||||
|
answer_dao: UserAnswerDAO,
|
||||||
|
) -> TestAttemptService:
|
||||||
|
return TestAttemptService(test_dao, test_repo, attempt_repo, answer_dao)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_broadcast_service(self, user_dao: UserDAO) -> BroadcastService:
|
||||||
|
return BroadcastService(user_dao)
|
||||||
|
|
||||||
|
@provide(scope=Scope.REQUEST)
|
||||||
|
def get_excel_service(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> ExcelService:
|
||||||
|
return ExcelService(test_dao, attempt_repo)
|
||||||
|
|
||||||
|
|
||||||
class SchedulerProvider(Provider):
|
class SchedulerProvider(Provider):
|
||||||
@provide(scope = Scope.APP)
|
@provide(scope = Scope.APP)
|
||||||
def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler:
|
def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler:
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
# Service layer - бизнес-логика приложения
|
||||||
@@ -0,0 +1,43 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from aiogram import Bot
|
||||||
|
from aiogram.exceptions import TelegramAPIError
|
||||||
|
|
||||||
|
from quizzi.infrastructure.database.dao.user import UserDAO
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BroadcastStats:
|
||||||
|
total: int
|
||||||
|
success: int
|
||||||
|
failed: int
|
||||||
|
|
||||||
|
|
||||||
|
class BroadcastService:
|
||||||
|
def __init__(self, user_dao: UserDAO) -> None:
|
||||||
|
self._user_dao = user_dao
|
||||||
|
|
||||||
|
async def broadcast_message(
|
||||||
|
self,
|
||||||
|
bot: Bot,
|
||||||
|
message_id: int,
|
||||||
|
from_chat_id: int,
|
||||||
|
) -> BroadcastStats:
|
||||||
|
users = await self._user_dao.get_all()
|
||||||
|
|
||||||
|
total = len(users)
|
||||||
|
success = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
for user in users:
|
||||||
|
try:
|
||||||
|
await bot.copy_message(
|
||||||
|
chat_id=user.id,
|
||||||
|
from_chat_id=from_chat_id,
|
||||||
|
message_id=message_id,
|
||||||
|
)
|
||||||
|
success += 1
|
||||||
|
except TelegramAPIError:
|
||||||
|
failed += 1
|
||||||
|
|
||||||
|
return BroadcastStats(total=total, success=success, failed=failed)
|
||||||
@@ -0,0 +1,145 @@
|
|||||||
|
import io
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from openpyxl import Workbook
|
||||||
|
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
||||||
|
|
||||||
|
from quizzi.infrastructure.database.dao.test import TestDAO
|
||||||
|
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
|
from quizzi.infrastructure.utils.timezone import to_msk
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ExcelReportResult:
|
||||||
|
success: bool
|
||||||
|
data: bytes | None = None
|
||||||
|
filename: str | None = None
|
||||||
|
caption: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
class ExcelService:
|
||||||
|
def __init__(self, test_dao: TestDAO, attempt_repo: TestAttemptRepository) -> None:
|
||||||
|
self._test_dao = test_dao
|
||||||
|
self._attempt_repo = attempt_repo
|
||||||
|
|
||||||
|
def create_test_report(
|
||||||
|
self,
|
||||||
|
test_title: str,
|
||||||
|
group_number: int,
|
||||||
|
stats: list[tuple[str, int | None, datetime | None, bool | None]],
|
||||||
|
) -> bytes:
|
||||||
|
wb = Workbook()
|
||||||
|
ws = wb.active
|
||||||
|
assert ws is not None
|
||||||
|
ws.title = "Статистика"
|
||||||
|
|
||||||
|
header_font = Font(bold=True, color="FFFFFF")
|
||||||
|
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
||||||
|
header_alignment = Alignment(horizontal="center", vertical="center")
|
||||||
|
thin_border = Border(
|
||||||
|
left=Side(style="thin"),
|
||||||
|
right=Side(style="thin"),
|
||||||
|
top=Side(style="thin"),
|
||||||
|
bottom=Side(style="thin"),
|
||||||
|
)
|
||||||
|
|
||||||
|
ws.merge_cells("A1:E1")
|
||||||
|
ws["A1"] = f"Тест: {test_title}"
|
||||||
|
ws["A1"].font = Font(bold=True, size=14)
|
||||||
|
ws["A1"].alignment = Alignment(horizontal="center")
|
||||||
|
|
||||||
|
ws.merge_cells("A2:E2")
|
||||||
|
ws["A2"] = f"Группа: {group_number}"
|
||||||
|
ws["A2"].font = Font(bold=True, size=12)
|
||||||
|
ws["A2"].alignment = Alignment(horizontal="center")
|
||||||
|
|
||||||
|
headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
|
||||||
|
for col, header in enumerate(headers, 1):
|
||||||
|
cell = ws.cell(row=4, column=col, value=header)
|
||||||
|
cell.font = header_font
|
||||||
|
cell.fill = header_fill
|
||||||
|
cell.alignment = header_alignment
|
||||||
|
cell.border = thin_border
|
||||||
|
|
||||||
|
passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
||||||
|
failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
||||||
|
not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
|
||||||
|
|
||||||
|
grades: list[int] = []
|
||||||
|
|
||||||
|
for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
|
||||||
|
ws.cell(row=row_idx, column=1, value=name).border = thin_border
|
||||||
|
|
||||||
|
if score is not None:
|
||||||
|
ws.cell(row=row_idx, column=2, value=score).border = thin_border
|
||||||
|
|
||||||
|
grade = score // 10
|
||||||
|
grades.append(grade)
|
||||||
|
ws.cell(row=row_idx, column=3, value=grade).border = thin_border
|
||||||
|
|
||||||
|
finished_msk = to_msk(finished_at) if finished_at else None
|
||||||
|
date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—"
|
||||||
|
ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
|
||||||
|
status = "Пройден" if is_passed else "Не пройден"
|
||||||
|
status_cell = ws.cell(row=row_idx, column=5, value=status)
|
||||||
|
status_cell.border = thin_border
|
||||||
|
|
||||||
|
for col in range(1, 6):
|
||||||
|
ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
|
||||||
|
else:
|
||||||
|
ws.cell(row=row_idx, column=2, value="—").border = thin_border
|
||||||
|
ws.cell(row=row_idx, column=3, value="—").border = thin_border
|
||||||
|
ws.cell(row=row_idx, column=4, value="—").border = thin_border
|
||||||
|
status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
|
||||||
|
status_cell.border = thin_border
|
||||||
|
|
||||||
|
for col in range(1, 6):
|
||||||
|
ws.cell(row=row_idx, column=col).fill = not_passed_fill
|
||||||
|
|
||||||
|
ws.column_dimensions["A"].width = 30
|
||||||
|
ws.column_dimensions["B"].width = 15
|
||||||
|
ws.column_dimensions["C"].width = 10
|
||||||
|
ws.column_dimensions["D"].width = 20
|
||||||
|
ws.column_dimensions["E"].width = 15
|
||||||
|
|
||||||
|
total_users = len(stats)
|
||||||
|
passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
|
||||||
|
attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
|
||||||
|
|
||||||
|
summary_row = len(stats) + 6
|
||||||
|
ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
|
||||||
|
ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
|
||||||
|
ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
|
||||||
|
ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
|
||||||
|
if attempted_users > 0:
|
||||||
|
success_rate = round(passed_users / attempted_users * 100)
|
||||||
|
ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
|
||||||
|
if grades:
|
||||||
|
avg_grade = round(sum(grades) / len(grades), 1)
|
||||||
|
ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
|
||||||
|
|
||||||
|
output = io.BytesIO()
|
||||||
|
wb.save(output)
|
||||||
|
output.seek(0)
|
||||||
|
return output.read()
|
||||||
|
|
||||||
|
async def generate_group_report(self, test_id: int, group_number: int) -> ExcelReportResult:
|
||||||
|
test = await self._test_dao.get_by_id(test_id)
|
||||||
|
if not test:
|
||||||
|
return ExcelReportResult(success=False, caption="❌ Тест не найден")
|
||||||
|
|
||||||
|
stats = await self._attempt_repo.get_group_test_statistics(test_id, group_number)
|
||||||
|
if not stats:
|
||||||
|
return ExcelReportResult(success=False, caption=f"❌ В группе {group_number} нет студентов")
|
||||||
|
|
||||||
|
excel_bytes = self.create_test_report(test.title, group_number, stats)
|
||||||
|
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
|
||||||
|
filename = f"{safe_title}_group_{group_number}.xlsx"
|
||||||
|
|
||||||
|
return ExcelReportResult(
|
||||||
|
success=True,
|
||||||
|
data=excel_bytes,
|
||||||
|
filename=filename,
|
||||||
|
caption=f"📊 <b>Статистика по тесту</b>\n\n📝 {test.title}\n🎓 Группа {group_number}",
|
||||||
|
)
|
||||||
@@ -0,0 +1,184 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from quizzi.domain.schemas import Test
|
||||||
|
from quizzi.infrastructure.database.dao.test import TestDAO
|
||||||
|
from quizzi.infrastructure.database.dao.user import UserDAO
|
||||||
|
from quizzi.infrastructure.database.repo.test import TestRepository
|
||||||
|
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
|
from quizzi.infrastructure.utils.config import Config
|
||||||
|
from quizzi.infrastructure.utils.test_id_to_hash import decode_id, encode_id
|
||||||
|
from quizzi.infrastructure.utils.timezone import now_msk_naive
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestValidationResult:
|
||||||
|
is_valid: bool
|
||||||
|
error: str = ""
|
||||||
|
test: Test | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestAccessResult:
|
||||||
|
can_access: bool
|
||||||
|
error: str = ""
|
||||||
|
remaining_attempts: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestUpdateResult:
|
||||||
|
success: bool
|
||||||
|
message: str
|
||||||
|
|
||||||
|
|
||||||
|
class TestService:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
test_dao: TestDAO,
|
||||||
|
test_repo: TestRepository,
|
||||||
|
attempt_repo: TestAttemptRepository,
|
||||||
|
user_dao: UserDAO,
|
||||||
|
config: Config,
|
||||||
|
) -> None:
|
||||||
|
self._test_dao = test_dao
|
||||||
|
self._test_repo = test_repo
|
||||||
|
self._attempt_repo = attempt_repo
|
||||||
|
self._user_dao = user_dao
|
||||||
|
self._config = config
|
||||||
|
|
||||||
|
def decode_test_hash(self, test_hash: str) -> int | None:
|
||||||
|
try:
|
||||||
|
return decode_id(test_hash, self._config.security.encode_key)
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def encode_test_id(self, test_id: int) -> str:
|
||||||
|
return encode_id(
|
||||||
|
test_id,
|
||||||
|
self._config.security.encode_key,
|
||||||
|
self._config.security.encoded_string_length,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def validate_test(self, test_id: int, user_id: int) -> TestValidationResult:
|
||||||
|
test = await self._test_dao.get_by_id(test_id)
|
||||||
|
|
||||||
|
if not test:
|
||||||
|
return TestValidationResult(is_valid=False, error="❌ Тест не найден")
|
||||||
|
|
||||||
|
if not test.is_active:
|
||||||
|
return TestValidationResult(is_valid=False, error="❌ Тест деактивирован", test=test)
|
||||||
|
|
||||||
|
if test.expires_at and test.expires_at < now_msk_naive():
|
||||||
|
return TestValidationResult(is_valid=False, error="❌ Срок действия теста истек", test=test)
|
||||||
|
|
||||||
|
user = await self._user_dao.get_by_id(user_id)
|
||||||
|
if test.for_group and user and user.group != test.for_group:
|
||||||
|
return TestValidationResult(
|
||||||
|
is_valid=False,
|
||||||
|
error=f"❌ Тест доступен только для группы {test.for_group}",
|
||||||
|
test=test,
|
||||||
|
)
|
||||||
|
|
||||||
|
return TestValidationResult(is_valid=True, test=test)
|
||||||
|
|
||||||
|
async def check_test_access(self, test_id: int, user_id: int) -> TestAccessResult:
|
||||||
|
test = await self._test_dao.get_by_id(test_id)
|
||||||
|
|
||||||
|
if not test:
|
||||||
|
return TestAccessResult(can_access=False, error="❌ Тест не найден")
|
||||||
|
|
||||||
|
if not test.is_active:
|
||||||
|
return TestAccessResult(can_access=False, error="❌ Тест деактивирован")
|
||||||
|
|
||||||
|
if test.expires_at and test.expires_at < now_msk_naive():
|
||||||
|
return TestAccessResult(can_access=False, error="❌ Срок действия теста истек")
|
||||||
|
|
||||||
|
if test.attempts:
|
||||||
|
attempts = await self._attempt_repo.get_user_test_attempts(user_id, test_id)
|
||||||
|
finished_attempts = [a for a in attempts if a.finished_at]
|
||||||
|
remaining = test.attempts - len(finished_attempts)
|
||||||
|
|
||||||
|
if remaining <= 0:
|
||||||
|
return TestAccessResult(
|
||||||
|
can_access=False,
|
||||||
|
error="❌ Вы исчерпали все попытки",
|
||||||
|
remaining_attempts=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
return TestAccessResult(can_access=True, remaining_attempts=remaining)
|
||||||
|
|
||||||
|
return TestAccessResult(can_access=True)
|
||||||
|
|
||||||
|
async def get_available_tests(self, user_id: int, user_group: int | None) -> list[Test]:
|
||||||
|
return await self._test_repo.get_available_tests_for_user(user_id, user_group)
|
||||||
|
|
||||||
|
async def toggle_test_active(self, test_id: int) -> TestUpdateResult:
|
||||||
|
test = await self._test_dao.get_by_id(test_id)
|
||||||
|
if not test:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Тест не найден")
|
||||||
|
|
||||||
|
await self._test_dao.update(test_id, is_active=not test.is_active)
|
||||||
|
action = "деактивирован" if test.is_active else "активирован"
|
||||||
|
return TestUpdateResult(success=True, message=f"✅ Тест {action}")
|
||||||
|
|
||||||
|
async def toggle_results_viewable(self, test_id: int) -> TestUpdateResult:
|
||||||
|
test = await self._test_dao.get_by_id(test_id)
|
||||||
|
if not test:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Тест не найден")
|
||||||
|
|
||||||
|
await self._test_dao.update(test_id, are_results_viewable=not test.are_results_viewable)
|
||||||
|
action = "скрыты" if test.are_results_viewable else "видны"
|
||||||
|
return TestUpdateResult(success=True, message=f"✅ Результаты теперь {action}")
|
||||||
|
|
||||||
|
async def delete_test(self, test_id: int) -> bool:
|
||||||
|
return await self._test_dao.delete(test_id)
|
||||||
|
|
||||||
|
async def update_password(self, test_id: int, password: str) -> TestUpdateResult:
|
||||||
|
if len(password) > 255:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Пароль слишком длинный (максимум 255 символов)")
|
||||||
|
await self._test_dao.update(test_id, password=password)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Пароль обновлен")
|
||||||
|
|
||||||
|
async def remove_password(self, test_id: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, password=None)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Пароль удален")
|
||||||
|
|
||||||
|
async def update_attempts(self, test_id: int, attempts: int) -> TestUpdateResult:
|
||||||
|
if attempts < 1:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Количество попыток должно быть больше 0")
|
||||||
|
if attempts > 100:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Количество попыток не может быть больше 100")
|
||||||
|
await self._test_dao.update(test_id, attempts=attempts)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Количество попыток обновлено")
|
||||||
|
|
||||||
|
async def remove_attempts(self, test_id: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, attempts=None)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Ограничение попыток удалено")
|
||||||
|
|
||||||
|
async def update_time_limit(self, test_id: int, minutes: int) -> TestUpdateResult:
|
||||||
|
if minutes < 1:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Лимит времени должен быть больше 0")
|
||||||
|
if minutes > 1440:
|
||||||
|
return TestUpdateResult(success=False, message="❌ Лимит времени не может быть больше 1440 минут (24 часа)")
|
||||||
|
await self._test_dao.update(test_id, time_limit=minutes * 60)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Лимит времени обновлен")
|
||||||
|
|
||||||
|
async def remove_time_limit(self, test_id: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, time_limit=None)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Лимит времени удален")
|
||||||
|
|
||||||
|
async def update_group(self, test_id: int, group: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, for_group=group)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Группа обновлена")
|
||||||
|
|
||||||
|
async def remove_group(self, test_id: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, for_group=None)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Тест теперь доступен для всех групп")
|
||||||
|
|
||||||
|
async def update_expires(self, test_id: int, expires_at: datetime) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, expires_at=expires_at)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Срок действия обновлен")
|
||||||
|
|
||||||
|
async def remove_expires(self, test_id: int) -> TestUpdateResult:
|
||||||
|
await self._test_dao.update(test_id, expires_at=None)
|
||||||
|
return TestUpdateResult(success=True, message="✅ Срок действия удален")
|
||||||
@@ -0,0 +1,204 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from quizzi.domain.schemas import QuestionType
|
||||||
|
from quizzi.infrastructure.database.dao.test import TestDAO
|
||||||
|
from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
|
||||||
|
from quizzi.infrastructure.database.repo.test import TestRepository
|
||||||
|
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||||
|
from quizzi.infrastructure.utils.timezone import now_msk_naive
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AttemptStartResult:
|
||||||
|
success: bool
|
||||||
|
attempt_id: int | None = None
|
||||||
|
questions: list[int] | None = None
|
||||||
|
started_at: datetime | None = None
|
||||||
|
error: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AnswerResult:
|
||||||
|
success: bool
|
||||||
|
is_correct: bool = False
|
||||||
|
error: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TestResult:
|
||||||
|
score: int
|
||||||
|
correct_count: int
|
||||||
|
total_questions: int
|
||||||
|
is_passed: bool
|
||||||
|
|
||||||
|
|
||||||
|
class TestAttemptService:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
test_dao: TestDAO,
|
||||||
|
test_repo: TestRepository,
|
||||||
|
attempt_repo: TestAttemptRepository,
|
||||||
|
answer_dao: UserAnswerDAO,
|
||||||
|
) -> None:
|
||||||
|
self._test_dao = test_dao
|
||||||
|
self._test_repo = test_repo
|
||||||
|
self._attempt_repo = attempt_repo
|
||||||
|
self._answer_dao = answer_dao
|
||||||
|
|
||||||
|
async def start_attempt(self, user_id: int, test_id: int) -> AttemptStartResult:
|
||||||
|
active_attempt = await self._attempt_repo.get_active_attempt(user_id, test_id)
|
||||||
|
if active_attempt:
|
||||||
|
await self._attempt_repo.attempt_dao.delete(active_attempt.id)
|
||||||
|
|
||||||
|
_, questions = await self._test_repo.get_test_with_questions(test_id)
|
||||||
|
if not questions:
|
||||||
|
return AttemptStartResult(success=False, error="❌ В тесте нет вопросов")
|
||||||
|
|
||||||
|
attempt = await self._attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id)
|
||||||
|
started_at = now_msk_naive()
|
||||||
|
|
||||||
|
return AttemptStartResult(
|
||||||
|
success=True,
|
||||||
|
attempt_id=attempt.id,
|
||||||
|
questions=[q.id for q in questions],
|
||||||
|
started_at=started_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def cancel_attempt(self, attempt_id: int) -> bool:
|
||||||
|
return await self._attempt_repo.attempt_dao.delete(attempt_id)
|
||||||
|
|
||||||
|
async def save_single_answer(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
question_id: int,
|
||||||
|
selected_option_id: int,
|
||||||
|
) -> AnswerResult:
|
||||||
|
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||||
|
if not question:
|
||||||
|
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||||
|
|
||||||
|
correct_options = [opt for opt in options if opt.is_correct]
|
||||||
|
is_correct = any(opt.id == selected_option_id for opt in correct_options)
|
||||||
|
selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "")
|
||||||
|
|
||||||
|
await self._answer_dao.create(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
question_id=question_id,
|
||||||
|
selected_option_id=selected_option_id,
|
||||||
|
text_answer=selected_text,
|
||||||
|
is_correct=is_correct,
|
||||||
|
)
|
||||||
|
|
||||||
|
return AnswerResult(success=True, is_correct=is_correct)
|
||||||
|
|
||||||
|
async def save_multiple_answer(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
question_id: int,
|
||||||
|
selected_option_ids: list[int],
|
||||||
|
) -> AnswerResult:
|
||||||
|
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||||
|
if not question:
|
||||||
|
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||||
|
|
||||||
|
selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids])
|
||||||
|
correct_texts = sorted([opt.text for opt in options if opt.is_correct])
|
||||||
|
is_correct = selected_texts == correct_texts
|
||||||
|
|
||||||
|
await self._answer_dao.create(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
question_id=question_id,
|
||||||
|
text_answer="|".join(selected_texts),
|
||||||
|
is_correct=is_correct,
|
||||||
|
)
|
||||||
|
|
||||||
|
return AnswerResult(success=True, is_correct=is_correct)
|
||||||
|
|
||||||
|
async def save_text_answer(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
question_id: int,
|
||||||
|
text_answer: str,
|
||||||
|
) -> AnswerResult:
|
||||||
|
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||||
|
if not question:
|
||||||
|
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||||
|
|
||||||
|
correct_options = [opt for opt in options if opt.is_correct]
|
||||||
|
user_normalized = text_answer.lower().replace(" ", "")
|
||||||
|
is_correct = any(opt.text.lower().replace(" ", "") == user_normalized for opt in correct_options)
|
||||||
|
|
||||||
|
await self._answer_dao.create(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
question_id=question_id,
|
||||||
|
text_answer=text_answer,
|
||||||
|
is_correct=is_correct,
|
||||||
|
)
|
||||||
|
|
||||||
|
return AnswerResult(success=True, is_correct=is_correct)
|
||||||
|
|
||||||
|
async def finish_attempt(self, attempt_id: int, total_questions: int) -> TestResult:
|
||||||
|
correct_count = await self._attempt_repo.calculate_attempt_score(attempt_id)
|
||||||
|
score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0
|
||||||
|
is_passed = score >= 50
|
||||||
|
|
||||||
|
await self._attempt_repo.finish_attempt(attempt_id, score, is_passed)
|
||||||
|
|
||||||
|
return TestResult(
|
||||||
|
score=score,
|
||||||
|
correct_count=correct_count,
|
||||||
|
total_questions=total_questions,
|
||||||
|
is_passed=is_passed,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def finish_by_timeout(
|
||||||
|
self,
|
||||||
|
attempt_id: int,
|
||||||
|
questions: list[int],
|
||||||
|
user_answers: dict,
|
||||||
|
) -> TestResult:
|
||||||
|
answered_question_ids = set()
|
||||||
|
answers = await self._attempt_repo.get_answers_for_attempt(attempt_id)
|
||||||
|
for answer in answers:
|
||||||
|
answered_question_ids.add(answer.question_id)
|
||||||
|
|
||||||
|
for question_id in questions:
|
||||||
|
if question_id in answered_question_ids:
|
||||||
|
continue
|
||||||
|
|
||||||
|
answer_data = user_answers.get(str(question_id))
|
||||||
|
|
||||||
|
if answer_data:
|
||||||
|
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||||
|
if not question:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if answer_data["type"] == "single":
|
||||||
|
await self.save_single_answer(attempt_id, question_id, answer_data["answer"])
|
||||||
|
elif answer_data["type"] == "multiple":
|
||||||
|
await self.save_multiple_answer(attempt_id, question_id, answer_data["answer"])
|
||||||
|
else:
|
||||||
|
await self._answer_dao.create(
|
||||||
|
attempt_id=attempt_id,
|
||||||
|
question_id=question_id,
|
||||||
|
text_answer=None,
|
||||||
|
is_correct=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
return await self.finish_attempt(attempt_id, len(questions))
|
||||||
|
|
||||||
|
async def get_question_state(self, question_type: str):
|
||||||
|
from quizzi.application.bot.user_dialogs.states import UserTestSG
|
||||||
|
|
||||||
|
if question_type == QuestionType.SINGLE:
|
||||||
|
return UserTestSG.question_single
|
||||||
|
elif question_type == QuestionType.MULTIPLE:
|
||||||
|
return UserTestSG.question_multiple
|
||||||
|
else:
|
||||||
|
return UserTestSG.question_input
|
||||||
|
|
||||||
|
async def get_next_question_state(self, question_id: int):
|
||||||
|
question, _ = await self._test_repo.get_question_with_options(question_id)
|
||||||
|
question_type = question.question_type if question else QuestionType.SINGLE
|
||||||
|
return await self.get_question_state(question_type)
|
||||||
@@ -0,0 +1,75 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from quizzi.domain.schemas import User
|
||||||
|
from quizzi.infrastructure.database.dao.group import GroupDAO
|
||||||
|
from quizzi.infrastructure.database.dao.user import UserDAO
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RegistrationResult:
|
||||||
|
is_registered: bool
|
||||||
|
needs_name: bool = False
|
||||||
|
needs_group: bool = False
|
||||||
|
has_groups: bool = False
|
||||||
|
user: User | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class UserService:
|
||||||
|
def __init__(self, user_dao: UserDAO, group_dao: GroupDAO) -> None:
|
||||||
|
self._user_dao = user_dao
|
||||||
|
self._group_dao = group_dao
|
||||||
|
|
||||||
|
async def check_registration(self, user_id: int) -> RegistrationResult:
|
||||||
|
user = await self._user_dao.get_by_id(user_id)
|
||||||
|
groups = await self._group_dao.get_all()
|
||||||
|
has_groups = len(groups) > 0
|
||||||
|
|
||||||
|
if user is None:
|
||||||
|
return RegistrationResult(
|
||||||
|
is_registered=False,
|
||||||
|
needs_name=True,
|
||||||
|
needs_group=has_groups,
|
||||||
|
has_groups=has_groups,
|
||||||
|
)
|
||||||
|
|
||||||
|
needs_name = user.name is None
|
||||||
|
needs_group = has_groups and user.group is None
|
||||||
|
|
||||||
|
return RegistrationResult(
|
||||||
|
is_registered=not needs_name and not needs_group,
|
||||||
|
needs_name=needs_name,
|
||||||
|
needs_group=needs_group,
|
||||||
|
has_groups=has_groups,
|
||||||
|
user=user,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def create_user(
|
||||||
|
self,
|
||||||
|
user_id: int,
|
||||||
|
first_name: str,
|
||||||
|
username: str | None = None,
|
||||||
|
last_name: str | None = None,
|
||||||
|
) -> User:
|
||||||
|
return await self._user_dao.create(
|
||||||
|
user_id=user_id,
|
||||||
|
first_name=first_name,
|
||||||
|
username=username,
|
||||||
|
last_name=last_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def update_user_info(
|
||||||
|
self,
|
||||||
|
user_id: int,
|
||||||
|
first_name: str,
|
||||||
|
username: str | None = None,
|
||||||
|
last_name: str | None = None,
|
||||||
|
) -> User | None:
|
||||||
|
return await self._user_dao.upsert(
|
||||||
|
user_id=user_id,
|
||||||
|
first_name=first_name,
|
||||||
|
username=username,
|
||||||
|
last_name=last_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_all_users(self) -> list[User]:
|
||||||
|
return await self._user_dao.get_all()
|
||||||
Reference in New Issue
Block a user