From a82fb437d508d7147f32205d11f6d2f7ba8f6426 Mon Sep 17 00:00:00 2001 From: kolo Date: Sat, 3 Jan 2026 17:16:30 +0300 Subject: [PATCH] commit --- src/trudex/application/__main__.py | 2 + .../application/bot/user_dialogs/main_menu.py | 58 +- .../application/bot/user_dialogs/states.py | 10 + .../application/bot/user_dialogs/take_test.py | 501 ++++++++++++++++++ 4 files changed, 570 insertions(+), 1 deletion(-) create mode 100644 src/trudex/application/bot/user_dialogs/take_test.py diff --git a/src/trudex/application/__main__.py b/src/trudex/application/__main__.py index 62674a8..e6ada8d 100644 --- a/src/trudex/application/__main__.py +++ b/src/trudex/application/__main__.py @@ -38,6 +38,7 @@ from trudex.application.bot.middlewares.reject_not_creator import \ from trudex.application.bot.user_dialogs.main_menu import user_menu_dialog from trudex.application.bot.user_dialogs.registration import \ registration_dialog +from trudex.application.bot.user_dialogs.take_test import take_test_dialog from trudex.infrastructure.database.repo.user import UserRepository from trudex.infrastructure.di import DatabaseProvider, SchedulerProvider from trudex.infrastructure.utils.bot_commands import setup_bot_commands @@ -62,6 +63,7 @@ async def main() -> None: dp.include_routers( router, user_menu_dialog, + take_test_dialog, registration_dialog, admin_menu_dialog, admin_users_dialog, diff --git a/src/trudex/application/bot/user_dialogs/main_menu.py b/src/trudex/application/bot/user_dialogs/main_menu.py index 4254bed..de777ff 100644 --- a/src/trudex/application/bot/user_dialogs/main_menu.py +++ b/src/trudex/application/bot/user_dialogs/main_menu.py @@ -9,6 +9,7 @@ from dishka import FromDishka from dishka.integrations.aiogram_dialog import inject from trudex.application.bot.user_dialogs.states import UserMenuSG +from trudex.application.bot.user_dialogs.take_test import on_start_test from trudex.infrastructure.database.dao.group import GroupDAO from trudex.infrastructure.database.dao.user import UserDAO from trudex.infrastructure.database.repo.test import TestRepository @@ -172,7 +173,53 @@ async def get_available_tests( async def on_test_selected(_callback: CallbackQuery, _widget: Select, manager: DialogManager, item_id: str): - await _callback.answer("🚧 В разработке") + manager.dialog_data["selected_test_id"] = int(item_id) + await manager.switch_to(UserMenuSG.test_detail) + + +async def on_back_to_tests(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.switch_to(UserMenuSG.available_tests) + + +@inject +async def get_test_detail( + dialog_manager: DialogManager, + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + user_dao: FromDishka[UserDAO], + **_kwargs +): + test_id = dialog_manager.dialog_data.get("selected_test_id") + user_id = dialog_manager.event.from_user.id + + if not test_id: + return {"test_info": "❌ Тест не найден"} + + test, questions = await test_repo.get_test_with_questions(test_id) + + if not test: + return {"test_info": "❌ Тест не найден"} + + user = await user_dao.get_by_id(user_id) + attempts = await attempt_repo.get_user_test_attempts(user_id, test_id) + finished_attempts = [a for a in attempts if a.finished_at] + + password_str = f"🔒 Требуется пароль" if test.password else "🔓 Без пароля" + attempts_str = f"🔄 Попыток: {len(finished_attempts)}/{test.attempts}" if test.attempts else f"🔄 Попыток: {len(finished_attempts)}/♾️" + expires_str = f"📅 До {test.expires_at.strftime('%d.%m.%Y %H:%M')}" if test.expires_at else "📅 Без срока" + group_str = f"🎓 Для группы {test.for_group}" if test.for_group else "👥 Для всех" + + test_info = ( + f"📝 {test.title}\n\n" + f"
{test.description or '—'}
\n\n" + f"Вопросов: {len(questions)}\n" + f"{password_str}\n" + f"{attempts_str}\n" + f"{expires_str}\n" + f"{group_str}" + ) + + return {"test_info": test_info} user_menu_dialog = Dialog( @@ -207,6 +254,15 @@ user_menu_dialog = Dialog( state=UserMenuSG.available_tests, getter=get_available_tests, ), + Window( + Format("{test_info}"), + Column( + Button(Const("▶️ Пройти тест"), id="start_test", on_click=on_start_test), + Button(Const("◀️ Назад"), id="back", on_click=on_back_to_tests), + ), + state=UserMenuSG.test_detail, + getter=get_test_detail, + ), Window( Const("✏️ Изменение имени\n\nВведите новое имя:"), MessageInput(on_name_input), diff --git a/src/trudex/application/bot/user_dialogs/states.py b/src/trudex/application/bot/user_dialogs/states.py index d34d148..b28c737 100644 --- a/src/trudex/application/bot/user_dialogs/states.py +++ b/src/trudex/application/bot/user_dialogs/states.py @@ -4,10 +4,20 @@ from aiogram.fsm.state import State, StatesGroup class UserMenuSG(StatesGroup): main = State() available_tests = State() + test_detail = State() edit_name = State() edit_group = State() +class UserTestSG(StatesGroup): + password_input = State() + question_single = State() + question_multiple = State() + question_input = State() + results = State() + detailed_results = State() + + class UserRegistrationSG(StatesGroup): input_name = State() select_group = State() diff --git a/src/trudex/application/bot/user_dialogs/take_test.py b/src/trudex/application/bot/user_dialogs/take_test.py new file mode 100644 index 0000000..6117ba5 --- /dev/null +++ b/src/trudex/application/bot/user_dialogs/take_test.py @@ -0,0 +1,501 @@ +from datetime import datetime + +from aiogram.types import CallbackQuery, Message +from aiogram_dialog import Dialog, DialogManager, StartMode, Window +from aiogram_dialog.widgets.input import MessageInput +from aiogram_dialog.widgets.kbd import Button, Column, Multiselect, Radio +from aiogram_dialog.widgets.text import Const, Format +from dishka import FromDishka +from dishka.integrations.aiogram_dialog import inject + +from trudex.application.bot.user_dialogs.states import UserMenuSG, UserTestSG +from trudex.infrastructure.database.dao.test import TestDAO +from trudex.infrastructure.database.dao.user_answer import UserAnswerDAO +from trudex.infrastructure.database.models import QuestionType +from trudex.infrastructure.database.repo.test import TestRepository +from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository + + +async def get_state_for_question_type(question_type: str): + if question_type == QuestionType.SINGLE: + return UserTestSG.question_single + elif question_type == QuestionType.MULTIPLE: + return UserTestSG.question_multiple + else: + return UserTestSG.question_input + + +@inject +async def on_start_test( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_dao: FromDishka[TestDAO], + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], +): + test_id = manager.dialog_data.get("selected_test_id") + user_id = _callback.from_user.id + + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + test = await test_dao.get_by_id(test_id) + if not test: + await _callback.answer("❌ Тест не найден") + return + + if not test.is_active: + await _callback.answer("❌ Тест деактивирован") + return + + if test.expires_at and test.expires_at < datetime.utcnow(): + await _callback.answer("❌ Срок действия теста истек") + return + + if test.attempts: + attempts = await attempt_repo.get_user_test_attempts(user_id, test_id) + finished_attempts = [a for a in attempts if a.finished_at] + if len(finished_attempts) >= test.attempts: + await _callback.answer("❌ Вы исчерпали все попытки") + return + + active_attempt = await attempt_repo.get_active_attempt(user_id, test_id) + if active_attempt: + await _callback.answer("❌ У вас уже есть активная попытка") + return + + if test.password: + await manager.start(UserTestSG.password_input, mode=StartMode.NORMAL, data={"test_id": test_id}) + else: + _, questions = await test_repo.get_test_with_questions(test_id) + + if not questions: + await _callback.answer("❌ В тесте нет вопросов") + return + + attempt = await attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id) + + first_question, _ = await test_repo.get_question_with_options(questions[0].id) + first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) + + await manager.start( + first_state, + mode=StartMode.NORMAL, + data={ + "test_id": test_id, + "attempt_id": attempt.id, + "questions": [q.id for q in questions], + "current_question_index": 0, + "user_answers": {}, + } + ) + + +@inject +async def on_password_input( + message: Message, + _widget: MessageInput, + manager: DialogManager, + test_dao: FromDishka[TestDAO], + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], +): + start_data = manager.start_data or {} + test_id = start_data.get("test_id") + + if not test_id: + await message.answer("❌ Тест не найден") + return + + test = await test_dao.get_by_id(test_id) + + if not test or not test.password: + await message.answer("❌ Ошибка проверки пароля") + return + + if message.text and message.text.strip() == test.password: + await message.answer("✅ Пароль верный, начинаем тест") + + _, questions = await test_repo.get_test_with_questions(test_id) + + if not questions: + await message.answer("❌ В тесте нет вопросов") + return + + attempt = await attempt_repo.attempt_dao.create(user_id=message.from_user.id, test_id=test_id) + + first_question, _ = await test_repo.get_question_with_options(questions[0].id) + first_state = await get_state_for_question_type(first_question.question_type if first_question else QuestionType.SINGLE) + + manager.dialog_data["attempt_id"] = attempt.id + manager.dialog_data["questions"] = [q.id for q in questions] + manager.dialog_data["current_question_index"] = 0 + manager.dialog_data["user_answers"] = {} + + await manager.switch_to(first_state) + else: + await message.answer("❌ Неверный пароль") + + +async def on_cancel_test(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await _callback.answer("Тест отменен") + await manager.start(UserMenuSG.available_tests, mode=StartMode.RESET_STACK) + + +@inject +async def get_question_data( + dialog_manager: DialogManager, + test_repo: FromDishka[TestRepository], + **_kwargs +): + start_data = dialog_manager.start_data or {} + + current_index = dialog_manager.dialog_data.get("current_question_index") + if current_index is None: + current_index = start_data.get("current_question_index", 0) + + questions = dialog_manager.dialog_data.get("questions") or start_data.get("questions", []) + + if not questions or current_index >= len(questions): + return {"question_text": "Ошибка", "options": []} + + question_id = questions[current_index] + question, options = await test_repo.get_question_with_options(question_id) + + if not question: + return {"question_text": "Ошибка", "options": []} + + progress = f"{current_index + 1}/{len(questions)}" + + return { + "question_text": f"Вопрос {progress}\n\n{question.text}", + "options": [(opt.text, str(opt.id)) for opt in options], + } + + +async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str): + start_data = manager.start_data or {} + current_index = manager.dialog_data.get("current_question_index") + if current_index is None: + current_index = start_data.get("current_question_index", 0) + questions = manager.dialog_data.get("questions") or start_data.get("questions", []) + + if not questions or current_index >= len(questions): + return + + question_id = questions[current_index] + user_answers = manager.dialog_data.get("user_answers", {}) + user_answers[str(question_id)] = {"type": "single", "answer": int(item_id)} + manager.dialog_data["user_answers"] = user_answers + + +async def on_multiple_answer_changed(_event, widget, manager: DialogManager, _data: str): + start_data = manager.start_data or {} + current_index = manager.dialog_data.get("current_question_index") + if current_index is None: + current_index = start_data.get("current_question_index", 0) + questions = manager.dialog_data.get("questions") or start_data.get("questions", []) + + if not questions or current_index >= len(questions): + return + + question_id = questions[current_index] + selected = widget.get_checked() + + user_answers = manager.dialog_data.get("user_answers", {}) + user_answers[str(question_id)] = {"type": "multiple", "answer": [int(x) for x in selected]} + manager.dialog_data["user_answers"] = user_answers + + +@inject +async def on_text_answer_input( + message: Message, + _widget: MessageInput, + manager: DialogManager, + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + answer_dao: FromDishka[UserAnswerDAO], +): + start_data = manager.start_data or {} + current_index = manager.dialog_data.get("current_question_index") + if current_index is None: + current_index = start_data.get("current_question_index", 0) + questions = manager.dialog_data.get("questions") or start_data.get("questions", []) + + if not questions or current_index >= len(questions): + return + + question_id = questions[current_index] + text_answer = message.text.strip() if message.text else "" + attempt_id = manager.dialog_data.get("attempt_id") or start_data.get("attempt_id") + + if not attempt_id: + await message.answer("❌ Ошибка попытки") + return + + question, options = await test_repo.get_question_with_options(question_id) + if not question: + await message.answer("❌ Вопрос не найден") + return + + correct_options = [opt for opt in options if opt.is_correct] + user_normalized = text_answer.lower().replace(" ", "") + is_correct = any(opt.text.lower().replace(" ", "") == user_normalized for opt in correct_options) + + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer=text_answer, + is_correct=is_correct, + ) + + if current_index + 1 >= len(questions): + await finish_test(manager, attempt_repo, attempt_id, len(questions)) + else: + next_index = current_index + 1 + manager.dialog_data["current_question_index"] = next_index + + next_question_id = questions[next_index] + next_question, _ = await test_repo.get_question_with_options(next_question_id) + next_state = await get_state_for_question_type(next_question.question_type if next_question else QuestionType.SINGLE) + + await manager.switch_to(next_state) + + +@inject +async def on_next_question( + _callback: CallbackQuery, + _button: Button, + manager: DialogManager, + test_repo: FromDishka[TestRepository], + attempt_repo: FromDishka[TestAttemptRepository], + answer_dao: FromDishka[UserAnswerDAO], +): + start_data = manager.start_data or {} + current_index = manager.dialog_data.get("current_question_index") + if current_index is None: + current_index = start_data.get("current_question_index", 0) + questions = manager.dialog_data.get("questions") or start_data.get("questions", []) + + if not questions or current_index >= len(questions): + await _callback.answer("❌ Ошибка") + return + + question_id = questions[current_index] + user_answers = manager.dialog_data.get("user_answers", {}) + + if str(question_id) not in user_answers: + await _callback.answer("❌ Выберите ответ") + return + + answer_data = user_answers[str(question_id)] + attempt_id = manager.dialog_data.get("attempt_id") or start_data.get("attempt_id") + + if not attempt_id: + await _callback.answer("❌ Ошибка попытки") + return + + question, options = await test_repo.get_question_with_options(question_id) + + if not question: + await _callback.answer("❌ Вопрос не найден") + return + + if answer_data["type"] == "single": + selected_option_id = answer_data["answer"] + correct_options = [opt for opt in options if opt.is_correct] + is_correct = any(opt.id == selected_option_id for opt in correct_options) + + selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "") + + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + selected_option_id=selected_option_id, + text_answer=selected_text, + is_correct=is_correct, + ) + + elif answer_data["type"] == "multiple": + selected_option_ids = set(answer_data["answer"]) + correct_option_ids = {opt.id for opt in options if opt.is_correct} + + selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids]) + correct_texts = sorted([opt.text for opt in options if opt.is_correct]) + is_correct = selected_texts == correct_texts + + await answer_dao.create( + attempt_id=attempt_id, + question_id=question_id, + text_answer="|".join(selected_texts), + is_correct=is_correct, + ) + + if current_index + 1 >= len(questions): + await finish_test(manager, attempt_repo, attempt_id, len(questions)) + else: + next_index = current_index + 1 + manager.dialog_data["current_question_index"] = next_index + + next_question_id = questions[next_index] + next_question, _ = await test_repo.get_question_with_options(next_question_id) + next_state = await get_state_for_question_type(next_question.question_type if next_question else QuestionType.SINGLE) + + await manager.switch_to(next_state) + + +async def finish_test(manager: DialogManager, attempt_repo: TestAttemptRepository, attempt_id: int, total_questions: int): + correct_count = await attempt_repo.calculate_attempt_score(attempt_id) + + score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0 + is_passed = score >= 50 + + await attempt_repo.finish_attempt(attempt_id, score, is_passed) + + manager.dialog_data["score"] = score + manager.dialog_data["correct_count"] = correct_count + manager.dialog_data["total_questions"] = total_questions + manager.dialog_data["is_passed"] = is_passed + + await manager.switch_to(UserTestSG.results) + + +async def get_results_data(dialog_manager: DialogManager, **_kwargs): + score = dialog_manager.dialog_data.get("score", 0) + correct_count = dialog_manager.dialog_data.get("correct_count", 0) + total_questions = dialog_manager.dialog_data.get("total_questions", 0) + is_passed = dialog_manager.dialog_data.get("is_passed", False) + + status = "✅ Тест пройден!" if is_passed else "❌ Тест не пройден" + + results_text = ( + f"{status}\n\n" + f"Результат: {score}%\n" + f"Правильных ответов: {correct_count}/{total_questions}" + ) + + return {"results_text": results_text} + + +async def on_back_to_menu(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.start(UserMenuSG.main, mode=StartMode.RESET_STACK) + + +async def on_show_detailed_results(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.switch_to(UserTestSG.detailed_results) + + +async def on_back_to_results(_callback: CallbackQuery, _button: Button, manager: DialogManager): + await manager.switch_to(UserTestSG.results) + + +@inject +async def get_detailed_results_data( + dialog_manager: DialogManager, + attempt_repo: FromDishka[TestAttemptRepository], + test_repo: FromDishka[TestRepository], + **_kwargs +): + start_data = dialog_manager.start_data or {} + attempt_id = dialog_manager.dialog_data.get("attempt_id") or start_data.get("attempt_id") + + if not attempt_id: + return {"detailed_text": "Ошибка загрузки результатов"} + + answers = await attempt_repo.get_answers_for_attempt(attempt_id) + + lines = ["📋 Подробные результаты:\n"] + + for i, answer in enumerate(answers, 1): + question, options = await test_repo.get_question_with_options(answer.question_id) + if not question: + continue + + correct_options = [opt for opt in options if opt.is_correct] + correct_texts = [opt.text for opt in correct_options] + + status = "✅" if answer.is_correct else "❌" + + user_answer = answer.text_answer or "" + if "|" in user_answer: + user_answer = ", ".join(user_answer.split("|")) + + question_text = question.text.split(" (Тест:")[0] if " (Тест:" in question.text else question.text + + lines.append(f"{status} Вопрос {i}: {question_text}") + lines.append(f"Ваш ответ: {user_answer or '—'}") + lines.append(f"Правильный ответ: {', '.join(correct_texts)}\n") + + return {"detailed_text": "\n".join(lines)} + + +take_test_dialog = Dialog( + Window( + Const("🔑 Введите пароль для доступа к тесту:"), + MessageInput(on_password_input), + Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_test), + state=UserTestSG.password_input, + ), + Window( + Format("{question_text}\n\nВыберите один вариант ответа:"), + Column( + Radio( + Format("🔘 {item[0]}"), + Format("⚪ {item[0]}"), + id="single_answer", + item_id_getter=lambda x: x[1], + items="options", + on_click=on_single_answer_selected, + ), + ), + Column( + Button(Const("➡️ Далее"), id="next", on_click=on_next_question), + Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_test), + ), + state=UserTestSG.question_single, + getter=get_question_data, + ), + Window( + Format("{question_text}\n\nВыберите несколько вариантов ответа:"), + Column( + Multiselect( + Format("✅ {item[0]}"), + Format("⬜ {item[0]}"), + id="multiple_answer", + item_id_getter=lambda x: x[1], + items="options", + on_state_changed=on_multiple_answer_changed, + ), + ), + Column( + Button(Const("➡️ Далее"), id="next", on_click=on_next_question), + Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_test), + ), + state=UserTestSG.question_multiple, + getter=get_question_data, + ), + Window( + Format("{question_text}\n\nВведите ответ:"), + MessageInput(on_text_answer_input), + Button(Const("❌ Отмена"), id="cancel", on_click=on_cancel_test), + state=UserTestSG.question_input, + getter=get_question_data, + ), + Window( + Format("{results_text}"), + Column( + Button(Const("📋 Подробные результаты"), id="detailed", on_click=on_show_detailed_results), + Button(Const("◀️ В главное меню"), id="back", on_click=on_back_to_menu), + ), + state=UserTestSG.results, + getter=get_results_data, + ), + Window( + Format("{detailed_text}"), + Button(Const("◀️ Назад"), id="back", on_click=on_back_to_results), + state=UserTestSG.detailed_results, + getter=get_detailed_results_data, + ), +)