From 2d4ee6c77b7ff30c1e45fe1ac377a211887db81f Mon Sep 17 00:00:00 2001 From: kolo Date: Wed, 7 Jan 2026 01:28:34 +0300 Subject: [PATCH] update --- pyproject.toml | 1 + .../application/bot/shared_dialogs/states.py | 1 + .../application/bot/shared_dialogs/tests.py | 257 ++++++++++++------ .../database/repo/test_attempt.py | 38 +++ uv.lock | 23 ++ 5 files changed, 237 insertions(+), 83 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index eb586b6..a321e6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "qrcode[pil]>=8.2", "pycryptodome>=3.23.0", "json5>=0.13.0", + "openpyxl>=3.1.0", ] [dependency-groups] diff --git a/src/quizzi/application/bot/shared_dialogs/states.py b/src/quizzi/application/bot/shared_dialogs/states.py index 3a8e41e..bfc7ac5 100644 --- a/src/quizzi/application/bot/shared_dialogs/states.py +++ b/src/quizzi/application/bot/shared_dialogs/states.py @@ -20,6 +20,7 @@ class SharedTestsSG(StatesGroup): edit_expires = State() statistics = State() attempt_detail = State() + export_select_group = State() class SharedBroadcastSG(StatesGroup): diff --git a/src/quizzi/application/bot/shared_dialogs/tests.py b/src/quizzi/application/bot/shared_dialogs/tests.py index 14c7e2c..a949384 100644 --- a/src/quizzi/application/bot/shared_dialogs/tests.py +++ b/src/quizzi/application/bot/shared_dialogs/tests.py @@ -1,6 +1,6 @@ import asyncio import functools -import json +import io from datetime import date, datetime, time from aiogram import Bot @@ -11,9 +11,10 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG from aiogram_dialog.widgets.text import Const, Format from dishka import FromDishka from dishka.integrations.aiogram_dialog import inject +from openpyxl import Workbook +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG -from quizzi.domain.schemas import QuestionType from quizzi.infrastructure.database.dao.group import GroupDAO from quizzi.infrastructure.database.dao.test import TestDAO from quizzi.infrastructure.database.repo.test import TestRepository @@ -229,86 +230,8 @@ async def get_attempt_detail( return {"attempt_info": "\n".join(lines)} -@inject -async def on_export_test( - _callback: CallbackQuery, - _button: Button, - manager: DialogManager, - test_repo: FromDishka[TestRepository], -) -> None: - test_id = manager.dialog_data.get("selected_test_id") - - if not test_id: - await _callback.answer("❌ Тест не найден") - return - - assert _callback.message is not None - await _callback.answer("⏳ Экспортирую тест...") - - test, questions_with_options = await test_repo.get_full_test(test_id) - - if not test: - await _callback.message.answer("❌ Тест не найден") - return - - export_data: dict = { - "title": test.title, - "description": test.description, - "password": test.password, - "attempts": test.attempts, - "time_limit": test.time_limit, - "expires_at": test.expires_at.isoformat() if test.expires_at else None, - "for_group": test.for_group, - "questions": [], - } - - questions_list: list = export_data["questions"] - - for question, options in questions_with_options: - question_data: dict = { - "question_type": question.question_type.value, - "question": question.text, - } - - if question.question_type == QuestionType.INPUT: - correct_options = [o for o in options if o.is_correct] - if correct_options: - question_data["correct_answer"] = correct_options[0].text - else: - question_data["answers"] = [ - {"option": o.text, "is_correct": o.is_correct} - for o in options - ] - - questions_list.append(question_data) - - json_str = json.dumps(export_data, ensure_ascii=False, indent=2) - - created_str = test.created_at.strftime("%d.%m.%Y %H:%M") if test.created_at else "—" - updated_str = test.updated_at.strftime("%d.%m.%Y %H:%M") if test.updated_at else "—" - questions_count = len(questions_with_options) - - comment_header = f"""// ═══════════════════════════════════════════════════════════════ -// ЭКСПОРТ ТЕСТА: {test.title} -// ═══════════════════════════════════════════════════════════════ -// -// ❓ Вопросов: {questions_count} -// 📅 Создан: {created_str} -// 🔄 Обновлён: {updated_str} -// -// ═══════════════════════════════════════════════════════════════ - -""" - - full_content = comment_header + json_str - - safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:50] - filename = f"{safe_title}.json" - - await _callback.message.answer_document( - document=BufferedInputFile(full_content.encode("utf-8"), filename=filename), - caption=f"📤 Экспорт теста: {test.title}", - ) +async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None: + await manager.switch_to(SharedTestsSG.export_select_group) @inject @@ -564,6 +487,156 @@ async def on_back_clicked(_callback: CallbackQuery, _button: Button, manager: Di await manager.done() +@inject +async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs): + groups = await group_dao.get_all() + return { + "groups": [(f"🎓 {g.number}", str(g.number)) for g in groups], + "count": len(groups), + } + + +def create_excel_report( + test_title: str, + group_number: int, + stats: list[tuple[str, int | None, datetime | None, bool | None]], +) -> bytes: + wb = Workbook() + ws = wb.active + assert ws is not None + ws.title = "Статистика" + + header_font = Font(bold=True, color="FFFFFF") + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_alignment = Alignment(horizontal="center", vertical="center") + thin_border = Border( + left=Side(style="thin"), + right=Side(style="thin"), + top=Side(style="thin"), + bottom=Side(style="thin"), + ) + + ws.merge_cells("A1:E1") + ws["A1"] = f"Тест: {test_title}" + ws["A1"].font = Font(bold=True, size=14) + ws["A1"].alignment = Alignment(horizontal="center") + + ws.merge_cells("A2:E2") + ws["A2"] = f"Группа: {group_number}" + ws["A2"].font = Font(bold=True, size=12) + ws["A2"].alignment = Alignment(horizontal="center") + + headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"] + for col, header in enumerate(headers, 1): + cell = ws.cell(row=4, column=col, value=header) + cell.font = header_font + cell.fill = header_fill + cell.alignment = header_alignment + cell.border = thin_border + + passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") + failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") + not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") + + grades: list[int] = [] + + for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5): + ws.cell(row=row_idx, column=1, value=name).border = thin_border + + if score is not None: + ws.cell(row=row_idx, column=2, value=score).border = thin_border + + grade = score // 10 + grades.append(grade) + ws.cell(row=row_idx, column=3, value=grade).border = thin_border + + finished_msk = to_msk(finished_at) if finished_at else None + date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else "—" + ws.cell(row=row_idx, column=4, value=date_str).border = thin_border + status = "Пройден" if is_passed else "Не пройден" + status_cell = ws.cell(row=row_idx, column=5, value=status) + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill + else: + ws.cell(row=row_idx, column=2, value="—").border = thin_border + ws.cell(row=row_idx, column=3, value="—").border = thin_border + ws.cell(row=row_idx, column=4, value="—").border = thin_border + status_cell = ws.cell(row=row_idx, column=5, value="Не проходил") + status_cell.border = thin_border + + for col in range(1, 6): + ws.cell(row=row_idx, column=col).fill = not_passed_fill + + ws.column_dimensions["A"].width = 30 + ws.column_dimensions["B"].width = 15 + ws.column_dimensions["C"].width = 10 + ws.column_dimensions["D"].width = 20 + ws.column_dimensions["E"].width = 15 + + total_users = len(stats) + passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed) + attempted_users = sum(1 for _, score, _, _ in stats if score is not None) + + summary_row = len(stats) + 6 + ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True) + ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}") + ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}") + ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}") + if attempted_users > 0: + success_rate = round(passed_users / attempted_users * 100) + ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%") + if grades: + avg_grade = round(sum(grades) / len(grades), 1) + ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}") + + output = io.BytesIO() + wb.save(output) + output.seek(0) + return output.read() + + +@inject +async def on_group_selected_for_export( + _callback: CallbackQuery, + _widget: Select, + manager: DialogManager, + item_id: str, + test_dao: FromDishka[TestDAO], + attempt_repo: FromDishka[TestAttemptRepository], +) -> None: + test_id = manager.dialog_data.get("selected_test_id") + if not test_id: + await _callback.answer("❌ Тест не найден") + return + + assert _callback.message is not None + await _callback.answer("⏳ Формирую отчёт...") + + test = await test_dao.get_by_id(test_id) + if not test: + await _callback.message.answer("❌ Тест не найден") + return + + group_number = int(item_id) + stats = await attempt_repo.get_group_test_statistics(test_id, group_number) + + if not stats: + await _callback.message.answer(f"❌ В группе {group_number} нет студентов") + return + + excel_bytes = create_excel_report(test.title, group_number, stats) + + safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30] + filename = f"{safe_title}_group_{group_number}.xlsx" + + await _callback.message.answer_document( + document=BufferedInputFile(excel_bytes, filename=filename), + caption=f"📊 Статистика по тесту\n\n📝 {test.title}\n🎓 Группа {group_number}", + ) + + shared_tests_dialog = Dialog( Window( Format("📝 Тесты\n\nВсего: {count}"), @@ -601,7 +674,7 @@ shared_tests_dialog = Dialog( ), Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics), Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test), - Button(Const("📤 Экспорт"), id="export", on_click=on_export_test), + Button(Const("📥 Экспорт"), id="export", on_click=on_export_stats), Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu), Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list), ), @@ -704,4 +777,22 @@ shared_tests_dialog = Dialog( state=SharedTestsSG.attempt_detail, getter=get_attempt_detail, ), + Window( + Format("📥 Экспорт статистики\n\nВыберите группу для экспорта:\n\nВсего групп: {count}"), + ScrollingGroup( + Select( + Format("{item[0]}"), + id="export_group_select", + item_id_getter=lambda x: x[1], + items="groups", + on_click=on_group_selected_for_export, + ), + id="export_groups_scroll", + width=2, + height=7, + ), + Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail), + state=SharedTestsSG.export_select_group, + getter=get_groups_for_export, + ), ) diff --git a/src/quizzi/infrastructure/database/repo/test_attempt.py b/src/quizzi/infrastructure/database/repo/test_attempt.py index a7e44d5..524a9ea 100644 --- a/src/quizzi/infrastructure/database/repo/test_attempt.py +++ b/src/quizzi/infrastructure/database/repo/test_attempt.py @@ -284,3 +284,41 @@ class TestAttemptRepository: async def mark_warning_sent(self, attempt_id: int, sent_at: datetime) -> None: await self.attempt_dao.update(attempt_id=attempt_id, warning_sent_at=sent_at) + + async def get_group_test_statistics( + self, test_id: int, group_number: int + ) -> list[tuple[str, int | None, datetime | None, bool | None]]: + result = await self.session.execute( + select( + UserModel.name, + UserModel.first_name, + TestAttemptModel.score, + TestAttemptModel.finished_at, + TestAttemptModel.is_passed, + ) + .select_from(UserModel) + .outerjoin( + TestAttemptModel, + (TestAttemptModel.user_id == UserModel.id) & + (TestAttemptModel.test_id == test_id) & + (TestAttemptModel.finished_at.isnot(None)) + ) + .where(UserModel.group == group_number) + .order_by(UserModel.name, UserModel.first_name, TestAttemptModel.finished_at.desc()) + ) + rows = result.all() + + user_best: dict[str, tuple[int | None, datetime | None, bool | None]] = {} + for name, first_name, score, finished_at, is_passed in rows: + display_name = name or first_name + if display_name not in user_best: + user_best[display_name] = (score, finished_at, is_passed) + elif score is not None: + current_score = user_best[display_name][0] + if current_score is None or score > current_score: + user_best[display_name] = (score, finished_at, is_passed) + + return [ + (name, score, finished_at, is_passed) + for name, (score, finished_at, is_passed) in sorted(user_best.items()) + ] diff --git a/uv.lock b/uv.lock index 14ba0c3..3b077c8 100644 --- a/uv.lock +++ b/uv.lock @@ -232,6 +232,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/b9/89381173b4f336e986d72471198614806cd313e0f85c143ccb677c310223/dishka-1.7.2-py3-none-any.whl", hash = "sha256:f6faa6ab321903926b825b3337d77172ee693450279b314434864978d01fbad3", size = 94774, upload-time = "2025-09-24T21:23:03.246Z" }, ] +[[package]] +name = "et-xmlfile" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" }, +] + [[package]] name = "frozenlist" version = "1.8.0" @@ -514,6 +523,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/da/7d22601b625e241d4f23ef1ebff8acfc60da633c9e7e7922e24d10f592b3/multidict-6.7.0-py3-none-any.whl", hash = "sha256:394fc5c42a333c9ffc3e421a4c85e08580d990e08b99f6bf35b4132114c5dcb3", size = 12317, upload-time = "2025-10-06T14:52:29.272Z" }, ] +[[package]] +name = "openpyxl" +version = "3.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "et-xmlfile" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" }, +] + [[package]] name = "pillow" version = "12.1.0" @@ -730,6 +751,7 @@ dependencies = [ { name = "dishka" }, { name = "httpx" }, { name = "json5" }, + { name = "openpyxl" }, { name = "pycryptodome" }, { name = "pydantic" }, { name = "qrcode", extra = ["pil"] }, @@ -753,6 +775,7 @@ requires-dist = [ { name = "dishka", specifier = ">=1.7.2" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "json5", specifier = ">=0.13.0" }, + { name = "openpyxl", specifier = ">=3.1.0" }, { name = "pycryptodome", specifier = ">=3.23.0" }, { name = "pydantic", specifier = ">=2.10.5" }, { name = "qrcode", extras = ["pil"], specifier = ">=8.2" },