Merge pull request #1 from koloideal/dev

Dev
This commit is contained in:
kolo
2026-01-07 11:00:45 +03:00
committed by GitHub
26 changed files with 1045 additions and 135 deletions
@@ -0,0 +1,26 @@
"""add time_limit to test
Revision ID: b1c2d3e4f5a6
Revises: ca107b03ddf8
Create Date: 2026-01-06 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b1c2d3e4f5a6'
down_revision: Union[str, None] = 'ca107b03ddf8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column('tests', sa.Column('time_limit', sa.Integer(), nullable=True))
def downgrade() -> None:
op.drop_column('tests', 'time_limit')
@@ -0,0 +1,24 @@
"""add warning_sent_at to test_attempt
Revision ID: c4d5e6f7a8b9
Revises: b1c2d3e4f5a6
Create Date: 2026-01-06
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = 'c4d5e6f7a8b9'
down_revision: str | None = 'b1c2d3e4f5a6'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column('test_attempts', sa.Column('warning_sent_at', sa.DateTime(), nullable=True))
def downgrade() -> None:
op.drop_column('test_attempts', 'warning_sent_at')
+6 -7
View File
@@ -1,14 +1,13 @@
[bot] [bot]
token = "1234567890:ABCdefGHIjklMNOpqrsTUVwxyz" token = "1234567890"
creator_id = 123456789 creator_id = 1234567890
[security] [security]
test_hash_salt = "your_secret_salt_here_change_in_production" encode_key = "encode_key"
test_hash_length = 16
[database] [database]
host = "localhost" host = "localhost"
port = 5432 port = 5432
user = "trudex_user" user = "postgres"
password = "secure_password" password = "passkey"
database = "trudex_db" database = "trudex"
+1
View File
@@ -19,6 +19,7 @@ dependencies = [
"qrcode[pil]>=8.2", "qrcode[pil]>=8.2",
"pycryptodome>=3.23.0", "pycryptodome>=3.23.0",
"json5>=0.13.0", "json5>=0.13.0",
"openpyxl>=3.1.0",
] ]
[dependency-groups] [dependency-groups]
@@ -100,11 +100,41 @@ async def on_attempts_input(message: Message, _widget: MessageInput, manager: Di
return return
manager.dialog_data["attempts"] = attempts manager.dialog_data["attempts"] = attempts
await manager.switch_to(SharedCreateTestSG.input_expires_at) await manager.switch_to(SharedCreateTestSG.input_time_limit)
async def on_skip_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager): async def on_skip_attempts(_callback: CallbackQuery, _button: Button, manager: DialogManager):
manager.dialog_data["attempts"] = None manager.dialog_data["attempts"] = None
await manager.switch_to(SharedCreateTestSG.input_time_limit)
async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager):
if not message.text:
await message.answer("❌ Лимит времени не может быть пустым")
return
time_limit_str = message.text.strip()
if not time_limit_str.isdigit():
await message.answer("❌ Лимит времени должен быть числом (в минутах)")
return
time_limit_minutes = int(time_limit_str)
if time_limit_minutes < 1:
await message.answer("❌ Лимит времени должен быть больше 0")
return
if time_limit_minutes > 1440:
await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)")
return
manager.dialog_data["time_limit"] = time_limit_minutes * 60
await manager.switch_to(SharedCreateTestSG.input_expires_at)
async def on_skip_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager):
manager.dialog_data["time_limit"] = None
await manager.switch_to(SharedCreateTestSG.input_expires_at) await manager.switch_to(SharedCreateTestSG.input_expires_at)
@@ -139,11 +169,13 @@ async def get_test_info(dialog_manager: DialogManager, **_kwargs):
description = dialog_manager.dialog_data.get("description", "") description = dialog_manager.dialog_data.get("description", "")
password = dialog_manager.dialog_data.get("password") password = dialog_manager.dialog_data.get("password")
attempts = dialog_manager.dialog_data.get("attempts") attempts = dialog_manager.dialog_data.get("attempts")
time_limit = dialog_manager.dialog_data.get("time_limit")
expires_at = dialog_manager.dialog_data.get("expires_at") expires_at = dialog_manager.dialog_data.get("expires_at")
for_group = dialog_manager.dialog_data.get("for_group") for_group = dialog_manager.dialog_data.get("for_group")
password_str = f"🔒 {password}" if password else "Без пароля" password_str = f"🔒 {password}" if password else "Без пароля"
attempts_str = f"🔄 {attempts}" if attempts else "♾️ Без ограничений" attempts_str = f"🔄 {attempts}" if attempts else "♾️ Без ограничений"
time_limit_str = f"⏱️ {time_limit // 60} мин" if time_limit else "⏱️ Без лимита"
expires_at_msk = to_msk(expires_at) expires_at_msk = to_msk(expires_at)
expires_str = expires_at_msk.strftime("%d.%m.%Y") if expires_at_msk else "Без срока" expires_str = expires_at_msk.strftime("%d.%m.%Y") if expires_at_msk else "Без срока"
group_str = str(for_group) if for_group else "Для всех" group_str = str(for_group) if for_group else "Для всех"
@@ -155,6 +187,7 @@ async def get_test_info(dialog_manager: DialogManager, **_kwargs):
f"<b>Описание:</b> {description}\n" f"<b>Описание:</b> {description}\n"
f"<b>Пароль:</b> {password_str}\n" f"<b>Пароль:</b> {password_str}\n"
f"<b>Попыток:</b> {attempts_str}\n" f"<b>Попыток:</b> {attempts_str}\n"
f"<b>Время:</b> {time_limit_str}\n"
f"<b>Истекает:</b> {expires_str}\n" f"<b>Истекает:</b> {expires_str}\n"
f"<b>Для группы:</b> {group_str}" f"<b>Для группы:</b> {group_str}"
) )
@@ -168,6 +201,7 @@ async def on_confirm_test(_callback: CallbackQuery, _button: Button, manager: Di
description = manager.dialog_data.get("description") description = manager.dialog_data.get("description")
password = manager.dialog_data.get("password") password = manager.dialog_data.get("password")
attempts = manager.dialog_data.get("attempts") attempts = manager.dialog_data.get("attempts")
time_limit = manager.dialog_data.get("time_limit")
expires_at = manager.dialog_data.get("expires_at") expires_at = manager.dialog_data.get("expires_at")
for_group = manager.dialog_data.get("for_group") for_group = manager.dialog_data.get("for_group")
@@ -176,6 +210,7 @@ async def on_confirm_test(_callback: CallbackQuery, _button: Button, manager: Di
description=description, description=description,
password=password, password=password,
attempts=attempts, attempts=attempts,
time_limit=time_limit,
expires_at=expires_at, expires_at=expires_at,
for_group=for_group, for_group=for_group,
) )
@@ -470,6 +505,12 @@ shared_create_test_dialog = Dialog(
Button(Const("⏭️ Без ограничений"), id="skip_attempts", on_click=on_skip_attempts), Button(Const("⏭️ Без ограничений"), id="skip_attempts", on_click=on_skip_attempts),
state=SharedCreateTestSG.input_attempts, state=SharedCreateTestSG.input_attempts,
), ),
Window(
Const("<b>⏱️ Лимит времени</b>\n\n🔢 <b>Введите лимит времени в минутах</b> (1-1440) или пропустите для неограниченного времени:"),
MessageInput(on_time_limit_input),
Button(Const("⏭️ Без лимита"), id="skip_time_limit", on_click=on_skip_time_limit),
state=SharedCreateTestSG.input_time_limit,
),
Window( Window(
Const("<b>📅 Срок действия</b>\n\n🗓 <b>Выберите дату истечения теста</b> или пропустите:"), Const("<b>📅 Срок действия</b>\n\n🗓 <b>Выберите дату истечения теста</b> или пропустите:"),
Calendar(id="calendar", on_click=on_date_selected), Calendar(id="calendar", on_click=on_date_selected),
@@ -15,10 +15,13 @@ class SharedTestsSG(StatesGroup):
edit_menu = State() edit_menu = State()
edit_password = State() edit_password = State()
edit_attempts = State() edit_attempts = State()
edit_time_limit = State()
edit_group = State() edit_group = State()
edit_expires = State() edit_expires = State()
statistics = State() statistics = State()
attempt_detail = State() attempt_detail = State()
export_select_group = State()
delete_confirm = State()
class SharedBroadcastSG(StatesGroup): class SharedBroadcastSG(StatesGroup):
@@ -38,6 +41,7 @@ class SharedCreateTestSG(StatesGroup):
input_description = State() input_description = State()
input_password = State() input_password = State()
input_attempts = State() input_attempts = State()
input_time_limit = State()
input_expires_at = State() input_expires_at = State()
input_for_group = State() input_for_group = State()
confirm_test_info = State() confirm_test_info = State()
@@ -36,6 +36,7 @@ SPEC_INFO = """<b>📋 Спецификация формата JSON</b>
"description": "Описание теста", "description": "Описание теста",
"password": null, "password": null,
"attempts": null, "attempts": null,
"time_limit": null,
"expires_at": null, "expires_at": null,
"for_group": null, "for_group": null,
"questions": [...] "questions": [...]
@@ -46,6 +47,7 @@ SPEC_INFO = """<b>📋 Спецификация формата JSON</b>
• <code>description</code> — описание (до 2000 символов) • <code>description</code> — описание (до 2000 символов)
• <code>password</code> — пароль для доступа или <code>null</code> • <code>password</code> — пароль для доступа или <code>null</code>
• <code>attempts</code> — лимит попыток (1-100) или <code>null</code> • <code>attempts</code> — лимит попыток (1-100) или <code>null</code>
• <code>time_limit</code> — лимит времени в секундах (1-86400) или <code>null</code>
• <code>expires_at</code> — срок действия в ISO формате или <code>null</code> • <code>expires_at</code> — срок действия в ISO формате или <code>null</code>
• <code>for_group</code> — номер группы или <code>null</code> для всех • <code>for_group</code> — номер группы или <code>null</code> для всех
@@ -90,6 +92,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════
// ⚙️ НАСТРОЙКИ: // ⚙️ НАСТРОЙКИ:
// • Пароль: test2024 // • Пароль: test2024
// • Попыток: 5 // • Попыток: 5
// • Лимит времени: 1800 секунд (30 минут)
// • Срок действия: 31 декабря 2026, 23:59 // • Срок действия: 31 декабря 2026, 23:59
// • Для группы: 2024 (или null для всех) // • Для группы: 2024 (или null для всех)
// //
@@ -104,6 +107,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════
// 💡 ПОДСКАЗКИ: // 💡 ПОДСКАЗКИ:
// • null означает "не задано" / "без ограничений" // • null означает "не задано" / "без ограничений"
// • expires_at в формате ISO 8601: YYYY-MM-DDTHH:MM:SS // • expires_at в формате ISO 8601: YYYY-MM-DDTHH:MM:SS
// • time_limit в секундах (1-86400), null для без ограничений
// • for_group - номер группы или null для всех пользователей // • for_group - номер группы или null для всех пользователей
// • image_url - URL изображения к вопросу (опционально) // • image_url - URL изображения к вопросу (опционально)
// //
@@ -114,6 +118,7 @@ TEMPLATE_ULTIMATE = """// ══════════════════
"description": "Полная демонстрация всех возможностей формата: разные типы вопросов, настройки доступа, ограничения по времени и группам", "description": "Полная демонстрация всех возможностей формата: разные типы вопросов, настройки доступа, ограничения по времени и группам",
"password": "test2024", "password": "test2024",
"attempts": 5, "attempts": 5,
"time_limit": 1800,
"expires_at": "2026-12-31T23:59:59", "expires_at": "2026-12-31T23:59:59",
"for_group": 2024, "for_group": 2024,
"questions": [ "questions": [
@@ -225,6 +230,7 @@ async def on_test_selected_for_export(
"description": test.description, "description": test.description,
"password": test.password, "password": test.password,
"attempts": test.attempts, "attempts": test.attempts,
"time_limit": test.time_limit,
"expires_at": test.expires_at.isoformat() if test.expires_at else None, "expires_at": test.expires_at.isoformat() if test.expires_at else None,
"for_group": test.for_group, "for_group": test.for_group,
"questions": [], "questions": [],
@@ -340,6 +346,7 @@ async def create_test_from_parsed(
description=parsed.description, description=parsed.description,
password=parsed.password, password=parsed.password,
attempts=parsed.attempts, attempts=parsed.attempts,
time_limit=parsed.time_limit,
expires_at=parsed.expires_at, expires_at=parsed.expires_at,
for_group=parsed.for_group, for_group=parsed.for_group,
is_active=False, is_active=False,
@@ -1,6 +1,6 @@
import asyncio import asyncio
import functools import functools
import json import io
from datetime import date, datetime, time from datetime import date, datetime, time
from aiogram import Bot from aiogram import Bot
@@ -11,9 +11,10 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG
from aiogram_dialog.widgets.text import Const, Format from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject from dishka.integrations.aiogram_dialog import inject
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG
from quizzi.domain.schemas import QuestionType
from quizzi.infrastructure.database.dao.group import GroupDAO from quizzi.infrastructure.database.dao.group import GroupDAO
from quizzi.infrastructure.database.dao.test import TestDAO from quizzi.infrastructure.database.dao.test import TestDAO
from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test import TestRepository
@@ -68,6 +69,7 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T
status = "🟢 Активен" if test.is_active else "🔴 Деактивирован" status = "🟢 Активен" if test.is_active else "🔴 Деактивирован"
password_str = f"🔒 {test.password}" if test.password else "🔓 Без пароля" password_str = f"🔒 {test.password}" if test.password else "🔓 Без пароля"
attempts_str = f"🔄 {test.attempts}" if test.attempts else "♾️ Без ограничений" attempts_str = f"🔄 {test.attempts}" if test.attempts else "♾️ Без ограничений"
time_limit_str = f"⏱️ {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита"
expires_str = f"📅 {to_msk(test.expires_at).strftime('%d.%m.%Y %H:%M')}" if test.expires_at else "📅 Без срока" expires_str = f"📅 {to_msk(test.expires_at).strftime('%d.%m.%Y %H:%M')}" if test.expires_at else "📅 Без срока"
group_str = f"🎓 Группа {test.for_group}" if test.for_group else "👥 Для всех" group_str = f"🎓 Группа {test.for_group}" if test.for_group else "👥 Для всех"
results_str = "👁 Результаты видны" if test.are_results_viewable else "🔒 Результаты скрыты" results_str = "👁 Результаты видны" if test.are_results_viewable else "🔒 Результаты скрыты"
@@ -80,6 +82,7 @@ async def get_test_detail(test_dao: FromDishka[TestDAO], test_repo: FromDishka[T
f"<b>Вопросов:</b> {questions_count}\n" f"<b>Вопросов:</b> {questions_count}\n"
f"<b>Пароль:</b> {password_str}\n" f"<b>Пароль:</b> {password_str}\n"
f"<b>Попытки:</b> {attempts_str}\n" f"<b>Попытки:</b> {attempts_str}\n"
f"<b>Время:</b> {time_limit_str}\n"
f"<b>Срок:</b> {expires_str}\n" f"<b>Срок:</b> {expires_str}\n"
f"<b>Группа:</b> {group_str}\n" f"<b>Группа:</b> {group_str}\n"
f"<b>Видимость:</b> {results_str}\n\n" f"<b>Видимость:</b> {results_str}\n\n"
@@ -227,85 +230,8 @@ async def get_attempt_detail(
return {"attempt_info": "\n".join(lines)} return {"attempt_info": "\n".join(lines)}
@inject async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
async def on_export_test( await manager.switch_to(SharedTestsSG.export_select_group)
_callback: CallbackQuery,
_button: Button,
manager: DialogManager,
test_repo: FromDishka[TestRepository],
) -> None:
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
assert _callback.message is not None
await _callback.answer("⏳ Экспортирую тест...")
test, questions_with_options = await test_repo.get_full_test(test_id)
if not test:
await _callback.message.answer("❌ Тест не найден")
return
export_data: dict = {
"title": test.title,
"description": test.description,
"password": test.password,
"attempts": test.attempts,
"expires_at": test.expires_at.isoformat() if test.expires_at else None,
"for_group": test.for_group,
"questions": [],
}
questions_list: list = export_data["questions"]
for question, options in questions_with_options:
question_data: dict = {
"question_type": question.question_type.value,
"question": question.text,
}
if question.question_type == QuestionType.INPUT:
correct_options = [o for o in options if o.is_correct]
if correct_options:
question_data["correct_answer"] = correct_options[0].text
else:
question_data["answers"] = [
{"option": o.text, "is_correct": o.is_correct}
for o in options
]
questions_list.append(question_data)
json_str = json.dumps(export_data, ensure_ascii=False, indent=2)
created_str = test.created_at.strftime("%d.%m.%Y %H:%M") if test.created_at else ""
updated_str = test.updated_at.strftime("%d.%m.%Y %H:%M") if test.updated_at else ""
questions_count = len(questions_with_options)
comment_header = f"""// ═══════════════════════════════════════════════════════════════
// ЭКСПОРТ ТЕСТА: {test.title}
// ═══════════════════════════════════════════════════════════════
//
// ❓ Вопросов: {questions_count}
// 📅 Создан: {created_str}
// 🔄 Обновлён: {updated_str}
//
// ═══════════════════════════════════════════════════════════════
"""
full_content = comment_header + json_str
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:50]
filename = f"{safe_title}.json"
await _callback.message.answer_document(
document=BufferedInputFile(full_content.encode("utf-8"), filename=filename),
caption=f"📤 <b>Экспорт теста:</b> {test.title}",
)
@inject @inject
@@ -361,6 +287,10 @@ async def on_edit_attempts(_callback: CallbackQuery, _button: Button, manager: D
await manager.switch_to(SharedTestsSG.edit_attempts) await manager.switch_to(SharedTestsSG.edit_attempts)
async def on_edit_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.switch_to(SharedTestsSG.edit_time_limit)
async def on_edit_group(_callback: CallbackQuery, _button: Button, manager: DialogManager): async def on_edit_group(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.switch_to(SharedTestsSG.edit_group) await manager.switch_to(SharedTestsSG.edit_group)
@@ -369,6 +299,35 @@ async def on_edit_expires(_callback: CallbackQuery, _button: Button, manager: Di
await manager.switch_to(SharedTestsSG.edit_expires) await manager.switch_to(SharedTestsSG.edit_expires)
async def on_delete_test(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.switch_to(SharedTestsSG.delete_confirm)
@inject
async def get_delete_confirm_data(dialog_manager: DialogManager, test_dao: FromDishka[TestDAO], **_kwargs):
test_id = dialog_manager.dialog_data.get("selected_test_id")
if not test_id:
return {"test_title": "Неизвестный тест"}
test = await test_dao.get_by_id(test_id)
return {"test_title": test.title if test else "Неизвестный тест"}
@inject
async def on_confirm_delete(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
deleted = await test_dao.delete(test_id)
if deleted:
await _callback.answer("✅ Тест удалён")
await manager.switch_to(SharedTestsSG.tests_list)
else:
await _callback.answer("❌ Не удалось удалить тест")
@inject @inject
async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]): async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
test_id = manager.dialog_data.get("selected_test_id") test_id = manager.dialog_data.get("selected_test_id")
@@ -446,6 +405,51 @@ async def on_remove_attempts(_callback: CallbackQuery, _button: Button, manager:
await manager.switch_to(SharedTestsSG.test_detail) await manager.switch_to(SharedTestsSG.test_detail)
@inject
async def on_time_limit_input(message: Message, _widget: MessageInput, manager: DialogManager, test_dao: FromDishka[TestDAO]):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await message.answer("❌ Тест не найден")
return
if not message.text:
await message.answer("❌ Лимит времени не может быть пустым")
return
time_limit_str = message.text.strip()
if not time_limit_str.isdigit():
await message.answer("❌ Лимит времени должен быть числом (в минутах)")
return
time_limit_minutes = int(time_limit_str)
if time_limit_minutes < 1:
await message.answer("❌ Лимит времени должен быть больше 0")
return
if time_limit_minutes > 1440:
await message.answer("❌ Лимит времени не может быть больше 1440 минут (24 часа)")
return
time_limit_seconds = time_limit_minutes * 60
await test_dao.update(test_id, time_limit=time_limit_seconds)
await message.answer("✅ Лимит времени обновлен")
await manager.switch_to(SharedTestsSG.test_detail)
@inject
async def on_remove_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager, test_dao: FromDishka[TestDAO]):
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
await test_dao.update(test_id, time_limit=None)
await _callback.answer("✅ Лимит времени удален")
await manager.switch_to(SharedTestsSG.test_detail)
@inject @inject
async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs): async def get_groups_for_edit(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs):
groups = await group_dao.get_all() groups = await group_dao.get_all()
@@ -512,6 +516,156 @@ async def on_back_clicked(_callback: CallbackQuery, _button: Button, manager: Di
await manager.done() await manager.done()
@inject
async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs):
groups = await group_dao.get_all()
return {
"groups": [(f"🎓 {g.number}", str(g.number)) for g in groups],
"count": len(groups),
}
def create_excel_report(
test_title: str,
group_number: int,
stats: list[tuple[str, int | None, datetime | None, bool | None]],
) -> bytes:
wb = Workbook()
ws = wb.active
assert ws is not None
ws.title = "Статистика"
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
ws.merge_cells("A1:E1")
ws["A1"] = f"Тест: {test_title}"
ws["A1"].font = Font(bold=True, size=14)
ws["A1"].alignment = Alignment(horizontal="center")
ws.merge_cells("A2:E2")
ws["A2"] = f"Группа: {group_number}"
ws["A2"].font = Font(bold=True, size=12)
ws["A2"].alignment = Alignment(horizontal="center")
headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=4, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
grades: list[int] = []
for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
ws.cell(row=row_idx, column=1, value=name).border = thin_border
if score is not None:
ws.cell(row=row_idx, column=2, value=score).border = thin_border
grade = score // 10
grades.append(grade)
ws.cell(row=row_idx, column=3, value=grade).border = thin_border
finished_msk = to_msk(finished_at) if finished_at else None
date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else ""
ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
status = "Пройден" if is_passed else "Не пройден"
status_cell = ws.cell(row=row_idx, column=5, value=status)
status_cell.border = thin_border
for col in range(1, 6):
ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
else:
ws.cell(row=row_idx, column=2, value="").border = thin_border
ws.cell(row=row_idx, column=3, value="").border = thin_border
ws.cell(row=row_idx, column=4, value="").border = thin_border
status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
status_cell.border = thin_border
for col in range(1, 6):
ws.cell(row=row_idx, column=col).fill = not_passed_fill
ws.column_dimensions["A"].width = 30
ws.column_dimensions["B"].width = 15
ws.column_dimensions["C"].width = 10
ws.column_dimensions["D"].width = 20
ws.column_dimensions["E"].width = 15
total_users = len(stats)
passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
summary_row = len(stats) + 6
ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
if attempted_users > 0:
success_rate = round(passed_users / attempted_users * 100)
ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
if grades:
avg_grade = round(sum(grades) / len(grades), 1)
ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.read()
@inject
async def on_group_selected_for_export(
_callback: CallbackQuery,
_widget: Select,
manager: DialogManager,
item_id: str,
test_dao: FromDishka[TestDAO],
attempt_repo: FromDishka[TestAttemptRepository],
) -> None:
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
assert _callback.message is not None
await _callback.answer("⏳ Формирую отчёт...")
test = await test_dao.get_by_id(test_id)
if not test:
await _callback.message.answer("❌ Тест не найден")
return
group_number = int(item_id)
stats = await attempt_repo.get_group_test_statistics(test_id, group_number)
if not stats:
await _callback.message.answer(f"В группе {group_number} нет студентов")
return
excel_bytes = create_excel_report(test.title, group_number, stats)
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
filename = f"{safe_title}_group_{group_number}.xlsx"
await _callback.message.answer_document(
document=BufferedInputFile(excel_bytes, filename=filename),
caption=f"📊 <b>Статистика по тесту</b>\n\n📝 {test.title}\n🎓 Группа {group_number}",
)
shared_tests_dialog = Dialog( shared_tests_dialog = Dialog(
Window( Window(
Format("<b>📝 Тесты</b>\n\nВсего: {count}"), Format("<b>📝 Тесты</b>\n\nВсего: {count}"),
@@ -549,7 +703,7 @@ shared_tests_dialog = Dialog(
), ),
Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics), Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics),
Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test), Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test),
Button(Const("📤 Экспорт"), id="export", on_click=on_export_test), Button(Const("📥 Экспорт"), id="export", on_click=on_export_stats),
Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu), Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list),
), ),
@@ -561,8 +715,10 @@ shared_tests_dialog = Dialog(
Column( Column(
Button(Const("🔑 Пароль"), id="edit_password", on_click=on_edit_password), Button(Const("🔑 Пароль"), id="edit_password", on_click=on_edit_password),
Button(Const("🔄 Попытки"), id="edit_attempts", on_click=on_edit_attempts), Button(Const("🔄 Попытки"), id="edit_attempts", on_click=on_edit_attempts),
Button(Const("⏱️ Лимит времени"), id="edit_time_limit", on_click=on_edit_time_limit),
Button(Const("👥 Группа"), id="edit_group", on_click=on_edit_group), Button(Const("👥 Группа"), id="edit_group", on_click=on_edit_group),
Button(Const("📅 Срок действия"), id="edit_expires", on_click=on_edit_expires), Button(Const("📅 Срок действия"), id="edit_expires", on_click=on_edit_expires),
Button(Const("🗑 Удалить тест"), id="delete_test", on_click=on_delete_test),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail),
), ),
state=SharedTestsSG.edit_menu, state=SharedTestsSG.edit_menu,
@@ -585,6 +741,15 @@ shared_tests_dialog = Dialog(
), ),
state=SharedTestsSG.edit_attempts, state=SharedTestsSG.edit_attempts,
), ),
Window(
Const("<b>⏱️ Изменение лимита времени</b>\n\n🔢 <b>Введите лимит времени в минутах</b> (1-1440) или удалите ограничение:"),
MessageInput(on_time_limit_input),
Column(
Button(Const("🗑 Без лимита"), id="remove_time_limit", on_click=on_remove_time_limit),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_edit_menu),
),
state=SharedTestsSG.edit_time_limit,
),
Window( Window(
Const("<b>👥 Изменение группы</b>\n\n🎓 <b>Выберите группу</b> или удалите привязку:"), Const("<b>👥 Изменение группы</b>\n\n🎓 <b>Выберите группу</b> или удалите привязку:"),
ScrollingGroup( ScrollingGroup(
@@ -642,4 +807,31 @@ shared_tests_dialog = Dialog(
state=SharedTestsSG.attempt_detail, state=SharedTestsSG.attempt_detail,
getter=get_attempt_detail, getter=get_attempt_detail,
), ),
Window(
Format("<b>📥 Экспорт статистики</b>\n\nВыберите группу для экспорта:\n\nВсего групп: {count}"),
ScrollingGroup(
Select(
Format("{item[0]}"),
id="export_group_select",
item_id_getter=lambda x: x[1],
items="groups",
on_click=on_group_selected_for_export,
),
id="export_groups_scroll",
width=2,
height=7,
),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail),
state=SharedTestsSG.export_select_group,
getter=get_groups_for_export,
),
Window(
Format("<b>🗑 Удаление теста</b>\n\n⚠️ Вы уверены, что хотите удалить тест <b>{test_title}</b>?\n\n<i>Будут удалены все вопросы, варианты ответов и результаты прохождений.</i>"),
Row(
Button(Const("✅ Да, удалить"), id="confirm_delete", on_click=on_confirm_delete),
Button(Const("❌ Отмена"), id="cancel_delete", on_click=on_back_to_edit_menu),
),
state=SharedTestsSG.delete_confirm,
getter=get_delete_confirm_data,
),
) )
@@ -12,6 +12,7 @@ from quizzi.infrastructure.database.models import QuestionType
from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test import TestRepository
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter
from quizzi.infrastructure.utils.timezone import now_msk_naive
@inject @inject
@@ -41,13 +42,15 @@ async def get_deeplink_test_data(
password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля" password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля"
attempts_str = f"🔄 Попыток: {test.attempts}" if test.attempts else "🔄 Попыток: ♾️" attempts_str = f"🔄 Попыток: {test.attempts}" if test.attempts else "🔄 Попыток: ♾️"
time_limit_str = f"⏱️ Время: {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита"
test_info = ( test_info = (
f"<b>📝 {test.title}</b>\n\n" f"<b>📝 {test.title}</b>\n\n"
f"<blockquote>{test.description or ''}</blockquote>\n\n" f"<blockquote>{test.description or ''}</blockquote>\n\n"
f"<b>Вопросов:</b> {questions_count}\n" f"<b>Вопросов:</b> {questions_count}\n"
f"{password_str}\n" f"{password_str}\n"
f"{attempts_str}" f"{attempts_str}\n"
f"{time_limit_str}"
) )
return {"test_info": test_info, "can_start": True, "has_password": bool(test.password)} return {"test_info": test_info, "can_start": True, "has_password": bool(test.password)}
@@ -90,15 +93,22 @@ async def on_start_deeplink_test(
if active_attempt: if active_attempt:
await attempt_repo.attempt_dao.delete(active_attempt.id) await attempt_repo.attempt_dao.delete(active_attempt.id)
if test.password: if test.time_limit:
await manager.start(UserTestSG.confirm_time_limit, mode=StartMode.NORMAL, data={
"test_id": test_id,
"time_limit": test.time_limit,
"has_password": bool(test.password),
})
elif test.password:
allowed, wait_time = await rate_limiter.check(user_id) allowed, wait_time = await rate_limiter.check(user_id)
if not allowed: if not allowed:
minutes = int(wait_time // 60) + 1 minutes = int(wait_time // 60) + 1
await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True) await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True)
return return
manager.dialog_data["time_limit"] = None
await manager.switch_to(UserDeeplinkSG.password_input) await manager.switch_to(UserDeeplinkSG.password_input)
else: else:
await start_test_without_password(manager, test_repo, attempt_repo, test_id, user_id) await start_test_without_password(manager, test_repo, attempt_repo, test_id, user_id, None)
async def start_test_without_password( async def start_test_without_password(
@@ -107,6 +117,7 @@ async def start_test_without_password(
attempt_repo: TestAttemptRepository, attempt_repo: TestAttemptRepository,
test_id: int, test_id: int,
user_id: int, user_id: int,
time_limit: int | None = None,
): ):
_, questions = await test_repo.get_test_with_questions(test_id) _, questions = await test_repo.get_test_with_questions(test_id)
@@ -114,6 +125,7 @@ async def start_test_without_password(
return return
attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id)
started_at = now_msk_naive()
first_question, _ = await test_repo.get_question_with_options(questions[0].id) first_question, _ = await test_repo.get_question_with_options(questions[0].id)
@@ -136,6 +148,8 @@ async def start_test_without_password(
"questions": [q.id for q in questions], "questions": [q.id for q in questions],
"current_question_index": 0, "current_question_index": 0,
"user_answers": {}, "user_answers": {},
"time_limit": time_limit,
"started_at": started_at.isoformat(),
} }
) )
@@ -168,8 +182,9 @@ async def on_deeplink_password_input(
if message.text and message.text.strip() == test.password: if message.text and message.text.strip() == test.password:
await message.answer("✅ Пароль верный") await message.answer("✅ Пароль верный")
time_limit = manager.dialog_data.get("time_limit")
await start_test_without_password( await start_test_without_password(
manager, test_repo, attempt_repo, test_id, message.from_user.id manager, test_repo, attempt_repo, test_id, message.from_user.id, time_limit
) )
else: else:
allowed, wait_time = await rate_limiter.check(message.from_user.id) allowed, wait_time = await rate_limiter.check(message.from_user.id)
@@ -278,6 +278,7 @@ async def get_test_detail(
password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля" password_str = "🔒 Требуется пароль" if test.password else "🔓 Без пароля"
attempts_str = f"🔄 Попыток: {len(finished_attempts)}/{test.attempts}" if test.attempts else f"🔄 Попыток: {len(finished_attempts)}/♾️" attempts_str = f"🔄 Попыток: {len(finished_attempts)}/{test.attempts}" if test.attempts else f"🔄 Попыток: {len(finished_attempts)}/♾️"
time_limit_str = f"⏱️ Время: {test.time_limit // 60} мин" if test.time_limit else "⏱️ Без лимита"
expires_at_msk = to_msk(test.expires_at) expires_at_msk = to_msk(test.expires_at)
expires_str = f"📅 До {expires_at_msk.strftime('%d.%m.%Y %H:%M')}" if expires_at_msk else "📅 Без срока" expires_str = f"📅 До {expires_at_msk.strftime('%d.%m.%Y %H:%M')}" if expires_at_msk else "📅 Без срока"
@@ -289,6 +290,7 @@ async def get_test_detail(
f"<b>Вопросов:</b> {len(questions)}\n" f"<b>Вопросов:</b> {len(questions)}\n"
f"{password_str}\n" f"{password_str}\n"
f"{attempts_str}\n" f"{attempts_str}\n"
f"{time_limit_str}\n"
f"{expires_str}\n" f"{expires_str}\n"
f"{group_str}" f"{group_str}"
) )
@@ -13,11 +13,13 @@ class UserMenuSG(StatesGroup):
class UserTestSG(StatesGroup): class UserTestSG(StatesGroup):
password_input = State() password_input = State()
confirm_time_limit = State()
question_single = State() question_single = State()
question_multiple = State() question_multiple = State()
question_input = State() question_input = State()
results = State() results = State()
detailed_results = State() detailed_results = State()
time_expired = State()
class UserDeeplinkSG(StatesGroup): class UserDeeplinkSG(StatesGroup):
@@ -1,9 +1,11 @@
from datetime import datetime
from aiogram.enums import ContentType as AiogramContentType from aiogram.enums import ContentType as AiogramContentType
from aiogram.types import CallbackQuery, Message from aiogram.types import CallbackQuery, Message
from aiogram_dialog import Dialog, DialogManager, StartMode, Window from aiogram_dialog import Dialog, DialogManager, StartMode, Window
from aiogram_dialog.api.entities import MediaAttachment, MediaId from aiogram_dialog.api.entities import MediaAttachment, MediaId
from aiogram_dialog.widgets.input import MessageInput from aiogram_dialog.widgets.input import MessageInput
from aiogram_dialog.widgets.kbd import Button, Column, Multiselect, Radio from aiogram_dialog.widgets.kbd import Button, Column, Multiselect, Radio, Row
from aiogram_dialog.widgets.media import DynamicMedia from aiogram_dialog.widgets.media import DynamicMedia
from aiogram_dialog.widgets.text import Const, Format from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka from dishka import FromDishka
@@ -28,6 +30,145 @@ async def get_state_for_question_type(question_type: str):
return UserTestSG.question_input return UserTestSG.question_input
def get_remaining_time(started_at: datetime, time_limit: int) -> int | None:
if not time_limit:
return None
elapsed = (now_msk_naive() - started_at).total_seconds()
remaining = time_limit - elapsed
return max(0, int(remaining))
def format_time(seconds: int) -> str:
if seconds >= 3600:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}:{secs:02d}"
def is_time_expired(started_at: datetime, time_limit: int | None) -> bool:
if not time_limit:
return False
remaining = get_remaining_time(started_at, time_limit)
return remaining is not None and remaining <= 0
async def finish_test_by_timeout(
manager: DialogManager,
attempt_repo: TestAttemptRepository,
answer_dao: UserAnswerDAO,
test_repo: TestRepository,
attempt_id: int,
questions: list[int],
user_answers: dict,
are_results_viewable: bool = False,
):
answered_question_ids = set()
answers = await attempt_repo.get_answers_for_attempt(attempt_id)
for answer in answers:
answered_question_ids.add(answer.question_id)
for question_id in questions:
if question_id in answered_question_ids:
continue
answer_data = user_answers.get(str(question_id))
if answer_data:
question, options = await test_repo.get_question_with_options(question_id)
if not question:
continue
if answer_data["type"] == "single":
selected_option_id = answer_data["answer"]
correct_options = [opt for opt in options if opt.is_correct]
is_correct = any(opt.id == selected_option_id for opt in correct_options)
selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "")
await answer_dao.create(
attempt_id=attempt_id,
question_id=question_id,
selected_option_id=selected_option_id,
text_answer=selected_text,
is_correct=is_correct,
)
elif answer_data["type"] == "multiple":
selected_option_ids = set(answer_data["answer"])
selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids])
correct_texts = sorted([opt.text for opt in options if opt.is_correct])
is_correct = selected_texts == correct_texts
await answer_dao.create(
attempt_id=attempt_id,
question_id=question_id,
text_answer="|".join(selected_texts),
is_correct=is_correct,
)
else:
await answer_dao.create(
attempt_id=attempt_id,
question_id=question_id,
text_answer=None,
is_correct=False,
)
correct_count = await attempt_repo.calculate_attempt_score(attempt_id)
total_questions = len(questions)
score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0
is_passed = score >= 50
await attempt_repo.finish_attempt(attempt_id, score, is_passed)
manager.dialog_data["score"] = score
manager.dialog_data["correct_count"] = correct_count
manager.dialog_data["total_questions"] = total_questions
manager.dialog_data["is_passed"] = is_passed
manager.dialog_data["are_results_viewable"] = are_results_viewable
manager.dialog_data["time_expired"] = True
await manager.switch_to(UserTestSG.time_expired)
async def check_time_and_finish_if_expired(
manager: DialogManager,
test_dao: TestDAO,
test_repo: TestRepository,
attempt_repo: TestAttemptRepository,
answer_dao: UserAnswerDAO,
) -> bool:
start_data = manager.start_data or {}
assert isinstance(start_data, dict)
test_id = manager.dialog_data.get("test_id") or start_data.get("test_id")
attempt_id = manager.dialog_data.get("attempt_id") or start_data.get("attempt_id")
started_at_str = manager.dialog_data.get("started_at") or start_data.get("started_at")
time_limit = manager.dialog_data.get("time_limit") or start_data.get("time_limit")
if not time_limit or not started_at_str or not attempt_id:
return False
started_at = datetime.fromisoformat(started_at_str) if isinstance(started_at_str, str) else started_at_str
if is_time_expired(started_at, time_limit):
questions = manager.dialog_data.get("questions") or start_data.get("questions", [])
user_answers = manager.dialog_data.get("user_answers", {})
test = await test_dao.get_by_id(test_id) if test_id else None
are_results_viewable = test.are_results_viewable if test else False
await finish_test_by_timeout(
manager, attempt_repo, answer_dao, test_repo,
attempt_id, questions, user_answers, are_results_viewable
)
return True
return False
@inject @inject
async def on_start_test( async def on_start_test(
_callback: CallbackQuery, _callback: CallbackQuery,
@@ -70,36 +211,121 @@ async def on_start_test(
if active_attempt: if active_attempt:
await attempt_repo.attempt_dao.delete(active_attempt.id) await attempt_repo.attempt_dao.delete(active_attempt.id)
if test.password: if test.time_limit:
await manager.start(UserTestSG.confirm_time_limit, mode=StartMode.NORMAL, data={
"test_id": test_id,
"time_limit": test.time_limit,
"has_password": bool(test.password),
})
elif test.password:
allowed, wait_time = await rate_limiter.check(user_id) allowed, wait_time = await rate_limiter.check(user_id)
if not allowed: if not allowed:
minutes = int(wait_time // 60) + 1 minutes = int(wait_time // 60) + 1
await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True) await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True)
return return
await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={"test_id": test_id}) await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={
"test_id": test_id,
"time_limit": None,
})
else: else:
_, questions = await test_repo.get_test_with_questions(test_id) await start_test_directly(manager, test_repo, attempt_repo, test_id, user_id, None)
if not questions:
await _callback.answer("❌ В тесте нет вопросов") async def get_confirm_time_limit_data(dialog_manager: DialogManager, **_kwargs):
return start_data = dialog_manager.start_data or {}
assert isinstance(start_data, dict)
attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) time_limit = start_data.get("time_limit", 0)
first_question, _ = await test_repo.get_question_with_options(questions[0].id) minutes = time_limit // 60
first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE)
return {
await manager.start( "time_limit_text": (
first_state, f"⚠️ <b>ВНИМАНИЕ!</b>\n\n"
mode=StartMode.NORMAL, f"У этого теста установлен лимит времени:\n"
data={ f"<b>⏱️ {minutes} минут</b>\n\n"
"test_id": test_id, f"После начала теста таймер нельзя остановить.\n"
"attempt_id": attempt.id, f"Если время закончится, тест будет автоматически завершён.\n\n"
"questions": [q.id for q in questions], f"<b>Вы готовы начать?</b>"
"current_question_index": 0,
"user_answers": {},
}
) )
}
@inject
async def on_confirm_time_limit(
_callback: CallbackQuery,
_button: Button,
manager: DialogManager,
test_dao: FromDishka[TestDAO],
test_repo: FromDishka[TestRepository],
attempt_repo: FromDishka[TestAttemptRepository],
rate_limiter: FromDishka[PasswordRateLimiter],
):
assert _callback.from_user is not None
start_data = manager.start_data or {}
assert isinstance(start_data, dict)
test_id = start_data.get("test_id")
time_limit = start_data.get("time_limit")
has_password = start_data.get("has_password", False)
user_id = _callback.from_user.id
if not test_id:
await _callback.answer("❌ Тест не найден")
return
if has_password:
test = await test_dao.get_by_id(test_id)
if test and test.password:
allowed, wait_time = await rate_limiter.check(user_id)
if not allowed:
minutes = int(wait_time // 60) + 1
await _callback.answer(f"⏳ Слишком много попыток. Подождите {minutes} мин.", show_alert=True)
return
await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={
"test_id": test_id,
"time_limit": time_limit,
})
return
await start_test_directly(manager, test_repo, attempt_repo, test_id, user_id, time_limit)
async def on_cancel_time_limit(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.done()
async def start_test_directly(
manager: DialogManager,
test_repo: TestRepository,
attempt_repo: TestAttemptRepository,
test_id: int,
user_id: int,
time_limit: int | None,
):
_, questions = await test_repo.get_test_with_questions(test_id)
if not questions:
return
attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id)
started_at = now_msk_naive()
first_question, _ = await test_repo.get_question_with_options(questions[0].id)
first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE)
await manager.start(
first_state,
mode=StartMode.RESET_STACK,
data={
"test_id": test_id,
"attempt_id": attempt.id,
"questions": [q.id for q in questions],
"current_question_index": 0,
"user_answers": {},
"time_limit": time_limit,
"started_at": started_at.isoformat(),
}
)
@inject @inject
@@ -116,6 +342,7 @@ async def on_password_input(
start_data = manager.start_data or {} start_data = manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
test_id = start_data.get("test_id") test_id = start_data.get("test_id")
time_limit = start_data.get("time_limit")
if not test_id: if not test_id:
await message.answer("❌ Тест не найден") await message.answer("❌ Тест не найден")
@@ -137,14 +364,18 @@ async def on_password_input(
return return
attempt = await attempt_repo.attempt_dao.create(user_id=message.from_user.id, test_id=test_id) attempt = await attempt_repo.attempt_dao.create(user_id=message.from_user.id, test_id=test_id)
started_at = now_msk_naive()
first_question, _ = await test_repo.get_question_with_options(questions[0].id) first_question, _ = await test_repo.get_question_with_options(questions[0].id)
first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE)
manager.dialog_data["test_id"] = test_id
manager.dialog_data["attempt_id"] = attempt.id manager.dialog_data["attempt_id"] = attempt.id
manager.dialog_data["questions"] = [q.id for q in questions] manager.dialog_data["questions"] = [q.id for q in questions]
manager.dialog_data["current_question_index"] = 0 manager.dialog_data["current_question_index"] = 0
manager.dialog_data["user_answers"] = {} manager.dialog_data["user_answers"] = {}
manager.dialog_data["time_limit"] = time_limit
manager.dialog_data["started_at"] = started_at.isoformat()
await manager.switch_to(first_state) await manager.switch_to(first_state)
else: else:
@@ -184,6 +415,23 @@ async def get_question_data(
start_data = dialog_manager.start_data or {} start_data = dialog_manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
time_limit = dialog_manager.dialog_data.get("time_limit") or start_data.get("time_limit")
started_at_str = dialog_manager.dialog_data.get("started_at") or start_data.get("started_at")
timer_str = ""
if time_limit and started_at_str:
started_at = datetime.fromisoformat(started_at_str) if isinstance(started_at_str, str) else started_at_str
remaining = get_remaining_time(started_at, time_limit)
if remaining is not None:
if remaining <= 0:
return {
"question_text": "⏰ <b>Время истекло!</b>",
"options": [],
"media": None,
"time_expired": True,
}
timer_str = f" ⏱️ {format_time(remaining)}"
current_index = dialog_manager.dialog_data.get("current_question_index") current_index = dialog_manager.dialog_data.get("current_question_index")
if current_index is None: if current_index is None:
current_index = start_data.get("current_question_index", 0) current_index = start_data.get("current_question_index", 0)
@@ -209,13 +457,26 @@ async def get_question_data(
) )
return { return {
"question_text": f"<b>📝 Вопрос {progress}</b>\n\n<blockquote>{question.text}</blockquote>", "question_text": f"<b>📝 Вопрос {progress}{timer_str}</b>\n\n<blockquote>{question.text}</blockquote>",
"options": [(opt.text, str(opt.id)) for opt in options], "options": [(opt.text, str(opt.id)) for opt in options],
"media": media, "media": media,
} }
async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str): @inject
async def on_single_answer_selected(
_callback: CallbackQuery,
_widget,
manager: DialogManager,
item_id: str,
test_dao: FromDishka[TestDAO],
test_repo: FromDishka[TestRepository],
attempt_repo: FromDishka[TestAttemptRepository],
answer_dao: FromDishka[UserAnswerDAO],
):
if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao):
return
start_data = manager.start_data or {} start_data = manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
current_index = manager.dialog_data.get("current_question_index") current_index = manager.dialog_data.get("current_question_index")
@@ -232,7 +493,20 @@ async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager:
manager.dialog_data["user_answers"] = user_answers manager.dialog_data["user_answers"] = user_answers
async def on_multiple_answer_changed(_event, widget, manager: DialogManager, _data: str): @inject
async def on_multiple_answer_changed(
_event,
widget,
manager: DialogManager,
_data: str,
test_dao: FromDishka[TestDAO],
test_repo: FromDishka[TestRepository],
attempt_repo: FromDishka[TestAttemptRepository],
answer_dao: FromDishka[UserAnswerDAO],
):
if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao):
return
start_data = manager.start_data or {} start_data = manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
current_index = manager.dialog_data.get("current_question_index") current_index = manager.dialog_data.get("current_question_index")
@@ -261,6 +535,9 @@ async def on_text_answer_input(
answer_dao: FromDishka[UserAnswerDAO], answer_dao: FromDishka[UserAnswerDAO],
test_dao: FromDishka[TestDAO], test_dao: FromDishka[TestDAO],
): ):
if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao):
return
start_data = manager.start_data or {} start_data = manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
current_index = manager.dialog_data.get("current_question_index") current_index = manager.dialog_data.get("current_question_index")
@@ -321,6 +598,9 @@ async def on_next_question(
answer_dao: FromDishka[UserAnswerDAO], answer_dao: FromDishka[UserAnswerDAO],
test_dao: FromDishka[TestDAO], test_dao: FromDishka[TestDAO],
): ):
if await check_time_and_finish_if_expired(manager, test_dao, test_repo, attempt_repo, answer_dao):
return
start_data = manager.start_data or {} start_data = manager.start_data or {}
assert isinstance(start_data, dict) assert isinstance(start_data, dict)
current_index = manager.dialog_data.get("current_question_index") current_index = manager.dialog_data.get("current_question_index")
@@ -441,6 +721,29 @@ async def get_results_data(dialog_manager: DialogManager, **_kwargs):
return {"results_text": results_text, "are_results_viewable": are_results_viewable} return {"results_text": results_text, "are_results_viewable": are_results_viewable}
async def get_time_expired_data(dialog_manager: DialogManager, **_kwargs):
score = dialog_manager.dialog_data.get("score", 0)
correct_count = dialog_manager.dialog_data.get("correct_count", 0)
total_questions = dialog_manager.dialog_data.get("total_questions", 0)
is_passed = dialog_manager.dialog_data.get("is_passed", False)
are_results_viewable = dialog_manager.dialog_data.get("are_results_viewable", False)
if is_passed:
status = "✅ <b>Тест пройден!</b>"
else:
status = "❌ <b>Тест не пройден</b>"
results_text = (
f"⏰ <b>Время истекло!</b>\n\n"
f"{status}\n\n"
f"📊 <b>Результат:</b> {score}%\n"
f"✏️ <b>Правильных ответов:</b> {correct_count} из {total_questions}\n\n"
f"<i>Неотвеченные вопросы засчитаны как неправильные.</i>"
)
return {"results_text": results_text, "are_results_viewable": are_results_viewable}
async def on_back_to_menu(_callback: CallbackQuery, _button: Button, manager: DialogManager): async def on_back_to_menu(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.start(UserMenuSG.main, mode=StartMode.RESET_STACK) await manager.start(UserMenuSG.main, mode=StartMode.RESET_STACK)
@@ -494,6 +797,15 @@ async def get_detailed_results_data(
take_test_dialog = Dialog( take_test_dialog = Dialog(
Window(
Format("{time_limit_text}"),
Row(
Button(Const("✅ Начать"), id="confirm", on_click=on_confirm_time_limit),
Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_time_limit),
),
state=UserTestSG.confirm_time_limit,
getter=get_confirm_time_limit_data,
),
Window( Window(
Const("<b>🔑 Введите пароль для доступа к тесту:</b>"), Const("<b>🔑 Введите пароль для доступа к тесту:</b>"),
MessageInput(on_password_input), MessageInput(on_password_input),
@@ -555,6 +867,20 @@ take_test_dialog = Dialog(
state=UserTestSG.results, state=UserTestSG.results,
getter=get_results_data, getter=get_results_data,
), ),
Window(
Format("{results_text}"),
Column(
Button(
Const("📋 Подробные результаты"),
id="detailed",
on_click=on_show_detailed_results,
when="are_results_viewable",
),
Button(Const("◀️ В главное меню"), id="back", on_click=on_back_to_menu),
),
state=UserTestSG.time_expired,
getter=get_time_expired_data,
),
Window( Window(
Format("{detailed_text}"), Format("{detailed_text}"),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_results), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_results),
+2
View File
@@ -41,6 +41,7 @@ class Test:
password: str | None = None password: str | None = None
expires_at: datetime | None = None expires_at: datetime | None = None
attempts: int | None = None attempts: int | None = None
time_limit: int | None = None
is_active: bool = True is_active: bool = True
are_results_viewable: bool = False are_results_viewable: bool = False
created_at: datetime | None = None created_at: datetime | None = None
@@ -73,6 +74,7 @@ class TestAttempt:
test_id: int test_id: int
started_at: datetime started_at: datetime
finished_at: datetime | None = None finished_at: datetime | None = None
warning_sent_at: datetime | None = None
score: int = 0 score: int = 0
is_passed: bool = False is_passed: bool = False
+3
View File
@@ -24,6 +24,7 @@ class ParsedTest:
description: str | None description: str | None
password: str | None password: str | None
attempts: int | None attempts: int | None
time_limit: int | None
expires_at: datetime | None expires_at: datetime | None
for_group: int | None for_group: int | None
questions: list[ParsedQuestion] questions: list[ParsedQuestion]
@@ -53,6 +54,7 @@ class TestParser:
description = self._parse_string(data, "description", required=False, max_length=2000, errors=errors) description = self._parse_string(data, "description", required=False, max_length=2000, errors=errors)
password = self._parse_string(data, "password", required=False, max_length=255, errors=errors) password = self._parse_string(data, "password", required=False, max_length=255, errors=errors)
attempts = self._parse_int(data, "attempts", required=False, min_val=1, max_val=100, errors=errors) attempts = self._parse_int(data, "attempts", required=False, min_val=1, max_val=100, errors=errors)
time_limit = self._parse_int(data, "time_limit", required=False, min_val=1, max_val=86400, errors=errors)
expires_at = self._parse_datetime(data, "expires_at", required=False, errors=errors) expires_at = self._parse_datetime(data, "expires_at", required=False, errors=errors)
for_group = self._parse_int(data, "for_group", required=False, errors=errors) for_group = self._parse_int(data, "for_group", required=False, errors=errors)
@@ -68,6 +70,7 @@ class TestParser:
description=description, description=description,
password=password, password=password,
attempts=attempts, attempts=attempts,
time_limit=time_limit,
expires_at=expires_at, expires_at=expires_at,
for_group=for_group, for_group=for_group,
questions=questions, questions=questions,
@@ -10,7 +10,6 @@ from quizzi.infrastructure.database.models import Test
class _UNSET: class _UNSET:
"""Sentinel для различения None и "не передано"."""
pass pass
@@ -24,6 +23,7 @@ class TestUpdateFields(TypedDict, total=False):
password: str | None password: str | None
expires_at: datetime | None expires_at: datetime | None
attempts: int | None attempts: int | None
time_limit: int | None
is_active: bool is_active: bool
are_results_viewable: bool are_results_viewable: bool
@@ -41,7 +41,7 @@ class TestDAO:
async def get_all(self) -> list[DomainTest]: async def get_all(self) -> list[DomainTest]:
result = await self.session.execute( result = await self.session.execute(
select(Test).order_by(Test.created_at.desc()) select(Test).order_by(Test.is_active.desc(), Test.created_at.desc())
) )
models = list(result.scalars().all()) models = list(result.scalars().all())
return [TestDTO(model).to_domain() for model in models] return [TestDTO(model).to_domain() for model in models]
@@ -64,6 +64,7 @@ class TestDAO:
password: str | None = None, password: str | None = None,
expires_at: datetime | None = None, expires_at: datetime | None = None,
attempts: int | None = None, attempts: int | None = None,
time_limit: int | None = None,
is_active: bool = True, is_active: bool = True,
are_results_viewable: bool = False, are_results_viewable: bool = False,
) -> DomainTest: ) -> DomainTest:
@@ -74,6 +75,7 @@ class TestDAO:
password=password, password=password,
expires_at=expires_at, expires_at=expires_at,
attempts=attempts, attempts=attempts,
time_limit=time_limit,
is_active=is_active, is_active=is_active,
are_results_viewable=are_results_viewable, are_results_viewable=are_results_viewable,
) )
@@ -91,6 +93,7 @@ class TestDAO:
password: str | None | _UNSET = UNSET, password: str | None | _UNSET = UNSET,
expires_at: datetime | None | _UNSET = UNSET, expires_at: datetime | None | _UNSET = UNSET,
attempts: int | None | _UNSET = UNSET, attempts: int | None | _UNSET = UNSET,
time_limit: int | None | _UNSET = UNSET,
is_active: bool | _UNSET = UNSET, is_active: bool | _UNSET = UNSET,
are_results_viewable: bool | _UNSET = UNSET, are_results_viewable: bool | _UNSET = UNSET,
) -> DomainTest | None: ) -> DomainTest | None:
@@ -113,6 +116,8 @@ class TestDAO:
test.expires_at = expires_at test.expires_at = expires_at
if not isinstance(attempts, _UNSET): if not isinstance(attempts, _UNSET):
test.attempts = attempts test.attempts = attempts
if not isinstance(time_limit, _UNSET):
test.time_limit = time_limit
if not isinstance(is_active, _UNSET): if not isinstance(is_active, _UNSET):
test.is_active = is_active test.is_active = is_active
if not isinstance(are_results_viewable, _UNSET): if not isinstance(are_results_viewable, _UNSET):
@@ -46,6 +46,7 @@ class TestAttemptDAO:
self, self,
attempt_id: int, attempt_id: int,
finished_at: datetime | None = None, finished_at: datetime | None = None,
warning_sent_at: datetime | None = None,
score: int | None = None, score: int | None = None,
is_passed: bool | None = None, is_passed: bool | None = None,
) -> DomainTestAttempt | None: ) -> DomainTestAttempt | None:
@@ -58,6 +59,8 @@ class TestAttemptDAO:
if finished_at is not None: if finished_at is not None:
attempt.finished_at = finished_at attempt.finished_at = finished_at
if warning_sent_at is not None:
attempt.warning_sent_at = warning_sent_at
if score is not None: if score is not None:
attempt.score = score attempt.score = score
if is_passed is not None: if is_passed is not None:
@@ -9,7 +9,6 @@ from quizzi.infrastructure.database.models import User
class _UNSET: class _UNSET:
"""Sentinel для различения None и "не передано"."""
pass pass
@@ -15,6 +15,7 @@ class TestDTO:
password=self.model.password, password=self.model.password,
expires_at=self.model.expires_at, expires_at=self.model.expires_at,
attempts=self.model.attempts, attempts=self.model.attempts,
time_limit=self.model.time_limit,
is_active=self.model.is_active, is_active=self.model.is_active,
are_results_viewable=self.model.are_results_viewable, are_results_viewable=self.model.are_results_viewable,
created_at=self.model.created_at, created_at=self.model.created_at,
@@ -13,6 +13,7 @@ class TestAttemptDTO:
test_id=self.model.test_id, test_id=self.model.test_id,
started_at=self.model.started_at, started_at=self.model.started_at,
finished_at=self.model.finished_at, finished_at=self.model.finished_at,
warning_sent_at=self.model.warning_sent_at,
score=self.model.score, score=self.model.score,
is_passed=self.model.is_passed, is_passed=self.model.is_passed,
) )
+8 -1
View File
@@ -52,6 +52,7 @@ class Test(Base):
password: Mapped[str | None] = mapped_column(String(255), default=None) password: Mapped[str | None] = mapped_column(String(255), default=None)
expires_at: Mapped[datetime | None] = mapped_column(default=None) expires_at: Mapped[datetime | None] = mapped_column(default=None)
attempts: Mapped[int | None] = mapped_column(Integer, default=None) attempts: Mapped[int | None] = mapped_column(Integer, default=None)
time_limit: Mapped[int | None] = mapped_column(Integer, default=None)
is_active: Mapped[bool] = mapped_column(default=True) is_active: Mapped[bool] = mapped_column(default=True)
are_results_viewable: Mapped[bool] = mapped_column(default=False) are_results_viewable: Mapped[bool] = mapped_column(default=False)
created_at: Mapped[datetime] = mapped_column(server_default=func.now()) created_at: Mapped[datetime] = mapped_column(server_default=func.now())
@@ -62,6 +63,11 @@ class Test(Base):
cascade="all, delete-orphan", cascade="all, delete-orphan",
order_by="Question.position" order_by="Question.position"
) )
test_attempts: Mapped[list["TestAttempt"]] = relationship(
back_populates="test",
cascade="all, delete-orphan",
)
@final @final
@@ -104,11 +110,12 @@ class TestAttempt(Base):
test_id: Mapped[int] = mapped_column(ForeignKey("tests.id"), index=True) test_id: Mapped[int] = mapped_column(ForeignKey("tests.id"), index=True)
started_at: Mapped[datetime] = mapped_column(server_default=func.now()) started_at: Mapped[datetime] = mapped_column(server_default=func.now())
finished_at: Mapped[datetime | None] = mapped_column(default=None) finished_at: Mapped[datetime | None] = mapped_column(default=None)
warning_sent_at: Mapped[datetime | None] = mapped_column(default=None)
score: Mapped[int] = mapped_column(Integer, default=0) score: Mapped[int] = mapped_column(Integer, default=0)
is_passed: Mapped[bool] = mapped_column(default=False) is_passed: Mapped[bool] = mapped_column(default=False)
user: Mapped["User"] = relationship() user: Mapped["User"] = relationship()
test: Mapped["Test"] = relationship() test: Mapped["Test"] = relationship(back_populates="test_attempts")
answers: Mapped[list["UserAnswer"]] = relationship( answers: Mapped[list["UserAnswer"]] = relationship(
back_populates="attempt", back_populates="attempt",
cascade="all, delete-orphan" cascade="all, delete-orphan"
@@ -125,7 +125,6 @@ class TestRepository:
async def get_questions_with_options_by_ids( async def get_questions_with_options_by_ids(
self, question_ids: list[int] self, question_ids: list[int]
) -> dict[int, tuple[Question, list[Option]]]: ) -> dict[int, tuple[Question, list[Option]]]:
"""Загружает вопросы с опциями по списку ID за один запрос."""
if not question_ids: if not question_ids:
return {} return {}
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import final from typing import final
from sqlalchemy import func, select from sqlalchemy import func, select
@@ -9,7 +10,10 @@ from quizzi.infrastructure.database.dao.test_attempt import TestAttemptDAO
from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
from quizzi.infrastructure.database.dto.test_attempt import TestAttemptDTO from quizzi.infrastructure.database.dto.test_attempt import TestAttemptDTO
from quizzi.infrastructure.database.dto.user_answer import UserAnswerDTO from quizzi.infrastructure.database.dto.user_answer import UserAnswerDTO
from quizzi.infrastructure.database.models import Question as QuestionModel
from quizzi.infrastructure.database.models import Test as TestModel
from quizzi.infrastructure.database.models import TestAttempt as TestAttemptModel from quizzi.infrastructure.database.models import TestAttempt as TestAttemptModel
from quizzi.infrastructure.database.models import User as UserModel
from quizzi.infrastructure.database.models import UserAnswer as UserAnswerModel from quizzi.infrastructure.database.models import UserAnswer as UserAnswerModel
from quizzi.infrastructure.utils.timezone import now_msk_naive from quizzi.infrastructure.utils.timezone import now_msk_naive
@@ -173,8 +177,6 @@ class TestAttemptRepository:
} }
async def get_most_difficult_questions(self, test_id: int, limit: int = 10) -> list[tuple[int, float]]: async def get_most_difficult_questions(self, test_id: int, limit: int = 10) -> list[tuple[int, float]]:
from quizzi.infrastructure.database.models import Question as QuestionModel
result = await self.session.execute( result = await self.session.execute(
select( select(
UserAnswerModel.question_id, UserAnswerModel.question_id,
@@ -209,8 +211,6 @@ class TestAttemptRepository:
} }
async def get_finished_attempts_with_tests(self, user_id: int) -> list[tuple[TestAttempt, str]]: async def get_finished_attempts_with_tests(self, user_id: int) -> list[tuple[TestAttempt, str]]:
from quizzi.infrastructure.database.models import Test as TestModel
result = await self.session.execute( result = await self.session.execute(
select(TestAttemptModel, TestModel.title) select(TestAttemptModel, TestModel.title)
.join(TestModel, TestAttemptModel.test_id == TestModel.id) .join(TestModel, TestAttemptModel.test_id == TestModel.id)
@@ -222,8 +222,6 @@ class TestAttemptRepository:
return [(TestAttemptDTO(row[0]).to_domain(), row[1]) for row in rows] return [(TestAttemptDTO(row[0]).to_domain(), row[1]) for row in rows]
async def get_test_attempts_with_users(self, test_id: int) -> list[tuple[TestAttempt, str]]: async def get_test_attempts_with_users(self, test_id: int) -> list[tuple[TestAttempt, str]]:
from quizzi.infrastructure.database.models import User as UserModel
result = await self.session.execute( result = await self.session.execute(
select(TestAttemptModel, UserModel.name, UserModel.first_name) select(TestAttemptModel, UserModel.name, UserModel.first_name)
.join(UserModel, TestAttemptModel.user_id == UserModel.id) .join(UserModel, TestAttemptModel.user_id == UserModel.id)
@@ -233,3 +231,94 @@ class TestAttemptRepository:
) )
rows = result.all() rows = result.all()
return [(TestAttemptDTO(row[0]).to_domain(), row[1] or row[2]) for row in rows] return [(TestAttemptDTO(row[0]).to_domain(), row[1] or row[2]) for row in rows]
async def get_expired_active_attempts(self, now: datetime) -> list[tuple[TestAttempt, int]]:
result = await self.session.execute(
select(TestAttemptModel, TestModel.time_limit)
.join(TestModel, TestAttemptModel.test_id == TestModel.id)
.where(TestAttemptModel.finished_at.is_(None))
.where(TestModel.time_limit.isnot(None))
)
rows = result.all()
expired = []
for attempt_model, time_limit in rows:
if time_limit:
elapsed = (now - attempt_model.started_at).total_seconds()
if elapsed >= time_limit:
expired.append((TestAttemptDTO(attempt_model).to_domain(), time_limit))
return expired
async def get_attempts_needing_warning(self, now: datetime) -> list[tuple[TestAttempt, int, int]]:
result = await self.session.execute(
select(
TestAttemptModel,
TestModel.time_limit,
func.count(QuestionModel.id).label("questions_count")
)
.join(TestModel, TestAttemptModel.test_id == TestModel.id)
.join(QuestionModel, QuestionModel.test_id == TestModel.id)
.where(TestAttemptModel.finished_at.is_(None))
.where(TestAttemptModel.warning_sent_at.is_(None))
.where(TestModel.time_limit.isnot(None))
.group_by(TestAttemptModel.id, TestModel.time_limit)
)
rows = result.all()
needing_warning = []
for attempt_model, time_limit, questions_count in rows:
if time_limit and questions_count > 0:
elapsed = (now - attempt_model.started_at).total_seconds()
time_remaining = time_limit - elapsed
threshold = time_limit * 0.1
if time_remaining <= threshold and time_remaining > 0:
needing_warning.append((
TestAttemptDTO(attempt_model).to_domain(),
time_limit,
questions_count
))
return needing_warning
async def mark_warning_sent(self, attempt_id: int, sent_at: datetime) -> None:
await self.attempt_dao.update(attempt_id=attempt_id, warning_sent_at=sent_at)
async def get_group_test_statistics(
self, test_id: int, group_number: int
) -> list[tuple[str, int | None, datetime | None, bool | None]]:
result = await self.session.execute(
select(
UserModel.name,
UserModel.first_name,
TestAttemptModel.score,
TestAttemptModel.finished_at,
TestAttemptModel.is_passed,
)
.select_from(UserModel)
.outerjoin(
TestAttemptModel,
(TestAttemptModel.user_id == UserModel.id) &
(TestAttemptModel.test_id == test_id) &
(TestAttemptModel.finished_at.isnot(None))
)
.where(UserModel.group == group_number)
.order_by(UserModel.name, UserModel.first_name, TestAttemptModel.finished_at.desc())
)
rows = result.all()
user_best: dict[str, tuple[int | None, datetime | None, bool | None]] = {}
for name, first_name, score, finished_at, is_passed in rows:
display_name = name or first_name
if display_name not in user_best:
user_best[display_name] = (score, finished_at, is_passed)
elif score is not None:
current_score = user_best[display_name][0]
if current_score is None or score > current_score:
user_best[display_name] = (score, finished_at, is_passed)
return [
(name, score, finished_at, is_passed)
for name, (score, finished_at, is_passed) in sorted(user_best.items())
]
+19 -2
View File
@@ -1,6 +1,7 @@
import logging import logging
from collections.abc import AsyncIterable from collections.abc import AsyncIterable
from aiogram import Bot
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from dishka import AsyncContainer, Provider, Scope, provide from dishka import AsyncContainer, Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
@@ -16,7 +17,7 @@ from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
from quizzi.infrastructure.database.repo.test import TestRepository from quizzi.infrastructure.database.repo.test import TestRepository
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
from quizzi.infrastructure.database.repo.user import UserRepository from quizzi.infrastructure.database.repo.user import UserRepository
from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests from quizzi.infrastructure.scheduling.tasks import deactivate_expired_tests, finish_expired_test_attempts, send_time_warning_notifications
from quizzi.infrastructure.utils.config import Config from quizzi.infrastructure.utils.config import Config
from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter from quizzi.infrastructure.utils.rate_limiter import PasswordRateLimiter
@@ -81,7 +82,7 @@ class DatabaseProvider(Provider):
class SchedulerProvider(Provider): class SchedulerProvider(Provider):
@provide(scope = Scope.APP) @provide(scope = Scope.APP)
def get_scheduler(self, container: AsyncContainer) -> AsyncIOScheduler: def get_scheduler(self, container: AsyncContainer, bot: Bot) -> AsyncIOScheduler:
logging.getLogger('apscheduler').setLevel(logging.WARNING) logging.getLogger('apscheduler').setLevel(logging.WARNING)
scheduler = AsyncIOScheduler() scheduler = AsyncIOScheduler()
@@ -93,4 +94,20 @@ class SchedulerProvider(Provider):
id='deactivate_expired_tests', id='deactivate_expired_tests',
) )
scheduler.add_job(
finish_expired_test_attempts,
'interval',
minutes=1,
args=[container, bot],
id='finish_expired_test_attempts',
)
scheduler.add_job(
send_time_warning_notifications,
'interval',
seconds=10,
args=[container, bot],
id='send_time_warning_notifications',
)
return scheduler return scheduler
@@ -1,8 +1,14 @@
import logging import logging
from aiogram import Bot
from aiogram.exceptions import TelegramAPIError
from dishka import AsyncContainer from dishka import AsyncContainer
from sqlalchemy.exc import SQLAlchemyError
from quizzi.infrastructure.database.dao.test import TestDAO from quizzi.infrastructure.database.dao.test import TestDAO
from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
from quizzi.infrastructure.database.repo.test import TestRepository
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
from quizzi.infrastructure.utils.timezone import now_msk_naive from quizzi.infrastructure.utils.timezone import now_msk_naive
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -17,3 +23,120 @@ async def deactivate_expired_tests(container: AsyncContainer) -> None:
for test in expired_tests: for test in expired_tests:
await test_dao.update(test.id, is_active=False) await test_dao.update(test.id, is_active=False)
logger.info("Деактивирован истёкший тест: id=%d, title=%s", test.id, test.title) logger.info("Деактивирован истёкший тест: id=%d, title=%s", test.id, test.title)
async def finish_expired_test_attempts(container: AsyncContainer, bot: Bot) -> None:
async with container() as request_container:
attempt_repo = await request_container.get(TestAttemptRepository)
test_repo = await request_container.get(TestRepository)
answer_dao = await request_container.get(UserAnswerDAO)
now = now_msk_naive()
expired_attempts = await attempt_repo.get_expired_active_attempts(now)
for attempt, _ in expired_attempts:
try:
test, questions_with_options = await test_repo.get_full_test(attempt.test_id)
if not test:
continue
question_ids = [q.id for q, _ in questions_with_options]
answered_question_ids = set()
answers = await attempt_repo.get_answers_for_attempt(attempt.id)
for answer in answers:
answered_question_ids.add(answer.question_id)
for question_id in question_ids:
if question_id not in answered_question_ids:
await answer_dao.create(
attempt_id=attempt.id,
question_id=question_id,
text_answer=None,
is_correct=False,
)
correct_count = await attempt_repo.calculate_attempt_score(attempt.id)
total_questions = len(question_ids)
score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0
is_passed = score >= 50
await attempt_repo.finish_attempt(attempt.id, score, is_passed)
status = "пройден ✅" if is_passed else "не пройден ❌"
try:
await bot.send_message(
attempt.user_id,
f"⏰ <b>Время на прохождение теста истекло!</b>\n\n"
f"📝 <b>Тест:</b> {test.title}\n"
f"📊 <b>Результат:</b> {score}%\n"
f"🏆 <b>Статус:</b> {status}\n\n"
f"<i>Неотвеченные вопросы засчитаны как неправильные.</i>"
)
except TelegramAPIError as e:
logger.warning("Не удалось отправить уведомление пользователю %d: %s", attempt.user_id, e)
logger.info(
"Завершена просроченная попытка: attempt_id=%d, user_id=%d, test_id=%d, score=%d%%",
attempt.id, attempt.user_id, attempt.test_id, score
)
except SQLAlchemyError as e:
logger.error("Ошибка при завершении попытки %d: %s", attempt.id, e)
async def send_time_warning_notifications(container: AsyncContainer, bot: Bot) -> None:
async with container() as request_container:
attempt_repo = await request_container.get(TestAttemptRepository)
test_repo = await request_container.get(TestRepository)
now = now_msk_naive()
attempts_needing_warning = await attempt_repo.get_attempts_needing_warning(now)
for attempt, time_limit, questions_count in attempts_needing_warning:
try:
answers = await attempt_repo.get_answers_for_attempt(attempt.id)
if answers:
avg_time_per_question = time_limit / questions_count
time_since_start = (now - attempt.started_at).total_seconds()
expected_answers = int(time_since_start / avg_time_per_question)
if len(answers) >= expected_answers:
continue
test, _ = await test_repo.get_full_test(attempt.test_id)
if not test:
continue
elapsed = (now - attempt.started_at).total_seconds()
remaining_seconds = int(time_limit - elapsed)
remaining_minutes = remaining_seconds // 60
remaining_secs = remaining_seconds % 60
if remaining_minutes > 0:
time_str = f"{remaining_minutes} мин {remaining_secs} сек"
else:
time_str = f"{remaining_secs} сек"
try:
await bot.send_message(
attempt.user_id,
f"⚠️ <b>Внимание! Время заканчивается!</b>\n\n"
f"📝 <b>Тест:</b> {test.title}\n"
f"⏱️ <b>Осталось:</b> {time_str}\n\n"
f"<i>Поторопитесь с ответами!</i>"
)
except TelegramAPIError as e:
logger.warning("Не удалось отправить предупреждение пользователю %d: %s", attempt.user_id, e)
await attempt_repo.mark_warning_sent(attempt.id, now)
logger.info(
"Отправлено предупреждение о времени: attempt_id=%d, user_id=%d, remaining=%ds",
attempt.id, attempt.user_id, remaining_seconds
)
except SQLAlchemyError as e:
logger.error("Ошибка при отправке предупреждения для попытки %d: %s", attempt.id, e)
@@ -9,7 +9,6 @@ def now_msk() -> datetime:
def now_msk_naive() -> datetime: def now_msk_naive() -> datetime:
"""Возвращает текущее время в МСК без timezone info (для сохранения в БД)."""
return datetime.now(MSK_TZ).replace(tzinfo=None) return datetime.now(MSK_TZ).replace(tzinfo=None)
Generated
+23
View File
@@ -232,6 +232,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/89381173b4f336e986d72471198614806cd313e0f85c143ccb677c310223/dishka-1.7.2-py3-none-any.whl", hash = "sha256:f6faa6ab321903926b825b3337d77172ee693450279b314434864978d01fbad3", size = 94774, upload-time = "2025-09-24T21:23:03.246Z" }, { url = "https://files.pythonhosted.org/packages/b7/b9/89381173b4f336e986d72471198614806cd313e0f85c143ccb677c310223/dishka-1.7.2-py3-none-any.whl", hash = "sha256:f6faa6ab321903926b825b3337d77172ee693450279b314434864978d01fbad3", size = 94774, upload-time = "2025-09-24T21:23:03.246Z" },
] ]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" },
]
[[package]] [[package]]
name = "frozenlist" name = "frozenlist"
version = "1.8.0" version = "1.8.0"
@@ -514,6 +523,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/da/7d22601b625e241d4f23ef1ebff8acfc60da633c9e7e7922e24d10f592b3/multidict-6.7.0-py3-none-any.whl", hash = "sha256:394fc5c42a333c9ffc3e421a4c85e08580d990e08b99f6bf35b4132114c5dcb3", size = 12317, upload-time = "2025-10-06T14:52:29.272Z" }, { url = "https://files.pythonhosted.org/packages/b7/da/7d22601b625e241d4f23ef1ebff8acfc60da633c9e7e7922e24d10f592b3/multidict-6.7.0-py3-none-any.whl", hash = "sha256:394fc5c42a333c9ffc3e421a4c85e08580d990e08b99f6bf35b4132114c5dcb3", size = 12317, upload-time = "2025-10-06T14:52:29.272Z" },
] ]
[[package]]
name = "openpyxl"
version = "3.1.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "et-xmlfile" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" },
]
[[package]] [[package]]
name = "pillow" name = "pillow"
version = "12.1.0" version = "12.1.0"
@@ -730,6 +751,7 @@ dependencies = [
{ name = "dishka" }, { name = "dishka" },
{ name = "httpx" }, { name = "httpx" },
{ name = "json5" }, { name = "json5" },
{ name = "openpyxl" },
{ name = "pycryptodome" }, { name = "pycryptodome" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "qrcode", extra = ["pil"] }, { name = "qrcode", extra = ["pil"] },
@@ -753,6 +775,7 @@ requires-dist = [
{ name = "dishka", specifier = ">=1.7.2" }, { name = "dishka", specifier = ">=1.7.2" },
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "json5", specifier = ">=0.13.0" }, { name = "json5", specifier = ">=0.13.0" },
{ name = "openpyxl", specifier = ">=3.1.0" },
{ name = "pycryptodome", specifier = ">=3.23.0" }, { name = "pycryptodome", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.10.5" }, { name = "pydantic", specifier = ">=2.10.5" },
{ name = "qrcode", extras = ["pil"], specifier = ">=8.2" }, { name = "qrcode", extras = ["pil"], specifier = ">=8.2" },