This commit is contained in:
2026-01-07 01:28:34 +03:00
parent ebdc9954de
commit 2d4ee6c77b
5 changed files with 237 additions and 83 deletions
+1
View File
@@ -19,6 +19,7 @@ dependencies = [
"qrcode[pil]>=8.2",
"pycryptodome>=3.23.0",
"json5>=0.13.0",
"openpyxl>=3.1.0",
]
[dependency-groups]
@@ -20,6 +20,7 @@ class SharedTestsSG(StatesGroup):
edit_expires = State()
statistics = State()
attempt_detail = State()
export_select_group = State()
class SharedBroadcastSG(StatesGroup):
@@ -1,6 +1,6 @@
import asyncio
import functools
import json
import io
from datetime import date, datetime, time
from aiogram import Bot
@@ -11,9 +11,10 @@ from aiogram_dialog.widgets.kbd import Button, Calendar, Column, Row, ScrollingG
from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from quizzi.application.bot.shared_dialogs.states import SharedCreateTestSG, SharedTestsSG
from quizzi.domain.schemas import QuestionType
from quizzi.infrastructure.database.dao.group import GroupDAO
from quizzi.infrastructure.database.dao.test import TestDAO
from quizzi.infrastructure.database.repo.test import TestRepository
@@ -229,86 +230,8 @@ async def get_attempt_detail(
return {"attempt_info": "\n".join(lines)}
@inject
async def on_export_test(
_callback: CallbackQuery,
_button: Button,
manager: DialogManager,
test_repo: FromDishka[TestRepository],
) -> None:
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
assert _callback.message is not None
await _callback.answer("⏳ Экспортирую тест...")
test, questions_with_options = await test_repo.get_full_test(test_id)
if not test:
await _callback.message.answer("❌ Тест не найден")
return
export_data: dict = {
"title": test.title,
"description": test.description,
"password": test.password,
"attempts": test.attempts,
"time_limit": test.time_limit,
"expires_at": test.expires_at.isoformat() if test.expires_at else None,
"for_group": test.for_group,
"questions": [],
}
questions_list: list = export_data["questions"]
for question, options in questions_with_options:
question_data: dict = {
"question_type": question.question_type.value,
"question": question.text,
}
if question.question_type == QuestionType.INPUT:
correct_options = [o for o in options if o.is_correct]
if correct_options:
question_data["correct_answer"] = correct_options[0].text
else:
question_data["answers"] = [
{"option": o.text, "is_correct": o.is_correct}
for o in options
]
questions_list.append(question_data)
json_str = json.dumps(export_data, ensure_ascii=False, indent=2)
created_str = test.created_at.strftime("%d.%m.%Y %H:%M") if test.created_at else ""
updated_str = test.updated_at.strftime("%d.%m.%Y %H:%M") if test.updated_at else ""
questions_count = len(questions_with_options)
comment_header = f"""// ═══════════════════════════════════════════════════════════════
// ЭКСПОРТ ТЕСТА: {test.title}
// ═══════════════════════════════════════════════════════════════
//
// ❓ Вопросов: {questions_count}
// 📅 Создан: {created_str}
// 🔄 Обновлён: {updated_str}
//
// ═══════════════════════════════════════════════════════════════
"""
full_content = comment_header + json_str
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:50]
filename = f"{safe_title}.json"
await _callback.message.answer_document(
document=BufferedInputFile(full_content.encode("utf-8"), filename=filename),
caption=f"📤 <b>Экспорт теста:</b> {test.title}",
)
async def on_export_stats(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
await manager.switch_to(SharedTestsSG.export_select_group)
@inject
@@ -564,6 +487,156 @@ async def on_back_clicked(_callback: CallbackQuery, _button: Button, manager: Di
await manager.done()
@inject
async def get_groups_for_export(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs):
groups = await group_dao.get_all()
return {
"groups": [(f"🎓 {g.number}", str(g.number)) for g in groups],
"count": len(groups),
}
def create_excel_report(
test_title: str,
group_number: int,
stats: list[tuple[str, int | None, datetime | None, bool | None]],
) -> bytes:
wb = Workbook()
ws = wb.active
assert ws is not None
ws.title = "Статистика"
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
ws.merge_cells("A1:E1")
ws["A1"] = f"Тест: {test_title}"
ws["A1"].font = Font(bold=True, size=14)
ws["A1"].alignment = Alignment(horizontal="center")
ws.merge_cells("A2:E2")
ws["A2"] = f"Группа: {group_number}"
ws["A2"].font = Font(bold=True, size=12)
ws["A2"].alignment = Alignment(horizontal="center")
headers = ["ФИО", "Результат (%)", "Оценка", "Дата прохождения", "Статус"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=4, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
passed_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
failed_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
not_passed_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid")
grades: list[int] = []
for row_idx, (name, score, finished_at, is_passed) in enumerate(stats, 5):
ws.cell(row=row_idx, column=1, value=name).border = thin_border
if score is not None:
ws.cell(row=row_idx, column=2, value=score).border = thin_border
grade = score // 10
grades.append(grade)
ws.cell(row=row_idx, column=3, value=grade).border = thin_border
finished_msk = to_msk(finished_at) if finished_at else None
date_str = finished_msk.strftime("%d.%m.%Y %H:%M") if finished_msk else ""
ws.cell(row=row_idx, column=4, value=date_str).border = thin_border
status = "Пройден" if is_passed else "Не пройден"
status_cell = ws.cell(row=row_idx, column=5, value=status)
status_cell.border = thin_border
for col in range(1, 6):
ws.cell(row=row_idx, column=col).fill = passed_fill if is_passed else failed_fill
else:
ws.cell(row=row_idx, column=2, value="").border = thin_border
ws.cell(row=row_idx, column=3, value="").border = thin_border
ws.cell(row=row_idx, column=4, value="").border = thin_border
status_cell = ws.cell(row=row_idx, column=5, value="Не проходил")
status_cell.border = thin_border
for col in range(1, 6):
ws.cell(row=row_idx, column=col).fill = not_passed_fill
ws.column_dimensions["A"].width = 30
ws.column_dimensions["B"].width = 15
ws.column_dimensions["C"].width = 10
ws.column_dimensions["D"].width = 20
ws.column_dimensions["E"].width = 15
total_users = len(stats)
passed_users = sum(1 for _, score, _, is_passed in stats if score is not None and is_passed)
attempted_users = sum(1 for _, score, _, _ in stats if score is not None)
summary_row = len(stats) + 6
ws.cell(row=summary_row, column=1, value="Итого:").font = Font(bold=True)
ws.cell(row=summary_row + 1, column=1, value=f"Всего студентов: {total_users}")
ws.cell(row=summary_row + 2, column=1, value=f"Прошли тест: {attempted_users}")
ws.cell(row=summary_row + 3, column=1, value=f"Сдали: {passed_users}")
if attempted_users > 0:
success_rate = round(passed_users / attempted_users * 100)
ws.cell(row=summary_row + 4, column=1, value=f"Процент сдачи: {success_rate}%")
if grades:
avg_grade = round(sum(grades) / len(grades), 1)
ws.cell(row=summary_row + 5, column=1, value=f"Средняя оценка: {avg_grade}")
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.read()
@inject
async def on_group_selected_for_export(
_callback: CallbackQuery,
_widget: Select,
manager: DialogManager,
item_id: str,
test_dao: FromDishka[TestDAO],
attempt_repo: FromDishka[TestAttemptRepository],
) -> None:
test_id = manager.dialog_data.get("selected_test_id")
if not test_id:
await _callback.answer("❌ Тест не найден")
return
assert _callback.message is not None
await _callback.answer("⏳ Формирую отчёт...")
test = await test_dao.get_by_id(test_id)
if not test:
await _callback.message.answer("❌ Тест не найден")
return
group_number = int(item_id)
stats = await attempt_repo.get_group_test_statistics(test_id, group_number)
if not stats:
await _callback.message.answer(f"В группе {group_number} нет студентов")
return
excel_bytes = create_excel_report(test.title, group_number, stats)
safe_title = "".join(c if c.isalnum() or c in "-_" else "_" for c in test.title)[:30]
filename = f"{safe_title}_group_{group_number}.xlsx"
await _callback.message.answer_document(
document=BufferedInputFile(excel_bytes, filename=filename),
caption=f"📊 <b>Статистика по тесту</b>\n\n📝 {test.title}\n🎓 Группа {group_number}",
)
shared_tests_dialog = Dialog(
Window(
Format("<b>📝 Тесты</b>\n\nВсего: {count}"),
@@ -601,7 +674,7 @@ shared_tests_dialog = Dialog(
),
Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics),
Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test),
Button(Const("📤 Экспорт"), id="export", on_click=on_export_test),
Button(Const("📥 Экспорт"), id="export", on_click=on_export_stats),
Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list),
),
@@ -704,4 +777,22 @@ shared_tests_dialog = Dialog(
state=SharedTestsSG.attempt_detail,
getter=get_attempt_detail,
),
Window(
Format("<b>📥 Экспорт статистики</b>\n\nВыберите группу для экспорта:\n\nВсего групп: {count}"),
ScrollingGroup(
Select(
Format("{item[0]}"),
id="export_group_select",
item_id_getter=lambda x: x[1],
items="groups",
on_click=on_group_selected_for_export,
),
id="export_groups_scroll",
width=2,
height=7,
),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_detail),
state=SharedTestsSG.export_select_group,
getter=get_groups_for_export,
),
)
@@ -284,3 +284,41 @@ class TestAttemptRepository:
async def mark_warning_sent(self, attempt_id: int, sent_at: datetime) -> None:
await self.attempt_dao.update(attempt_id=attempt_id, warning_sent_at=sent_at)
async def get_group_test_statistics(
self, test_id: int, group_number: int
) -> list[tuple[str, int | None, datetime | None, bool | None]]:
result = await self.session.execute(
select(
UserModel.name,
UserModel.first_name,
TestAttemptModel.score,
TestAttemptModel.finished_at,
TestAttemptModel.is_passed,
)
.select_from(UserModel)
.outerjoin(
TestAttemptModel,
(TestAttemptModel.user_id == UserModel.id) &
(TestAttemptModel.test_id == test_id) &
(TestAttemptModel.finished_at.isnot(None))
)
.where(UserModel.group == group_number)
.order_by(UserModel.name, UserModel.first_name, TestAttemptModel.finished_at.desc())
)
rows = result.all()
user_best: dict[str, tuple[int | None, datetime | None, bool | None]] = {}
for name, first_name, score, finished_at, is_passed in rows:
display_name = name or first_name
if display_name not in user_best:
user_best[display_name] = (score, finished_at, is_passed)
elif score is not None:
current_score = user_best[display_name][0]
if current_score is None or score > current_score:
user_best[display_name] = (score, finished_at, is_passed)
return [
(name, score, finished_at, is_passed)
for name, (score, finished_at, is_passed) in sorted(user_best.items())
]
Generated
+23
View File
@@ -232,6 +232,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/89381173b4f336e986d72471198614806cd313e0f85c143ccb677c310223/dishka-1.7.2-py3-none-any.whl", hash = "sha256:f6faa6ab321903926b825b3337d77172ee693450279b314434864978d01fbad3", size = 94774, upload-time = "2025-09-24T21:23:03.246Z" },
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" },
]
[[package]]
name = "frozenlist"
version = "1.8.0"
@@ -514,6 +523,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/da/7d22601b625e241d4f23ef1ebff8acfc60da633c9e7e7922e24d10f592b3/multidict-6.7.0-py3-none-any.whl", hash = "sha256:394fc5c42a333c9ffc3e421a4c85e08580d990e08b99f6bf35b4132114c5dcb3", size = 12317, upload-time = "2025-10-06T14:52:29.272Z" },
]
[[package]]
name = "openpyxl"
version = "3.1.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "et-xmlfile" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" },
]
[[package]]
name = "pillow"
version = "12.1.0"
@@ -730,6 +751,7 @@ dependencies = [
{ name = "dishka" },
{ name = "httpx" },
{ name = "json5" },
{ name = "openpyxl" },
{ name = "pycryptodome" },
{ name = "pydantic" },
{ name = "qrcode", extra = ["pil"] },
@@ -753,6 +775,7 @@ requires-dist = [
{ name = "dishka", specifier = ">=1.7.2" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "json5", specifier = ">=0.13.0" },
{ name = "openpyxl", specifier = ">=3.1.0" },
{ name = "pycryptodome", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.10.5" },
{ name = "qrcode", extras = ["pil"], specifier = ">=8.2" },