This commit is contained in:
2026-01-04 18:02:03 +03:00
parent 260171086b
commit f3c7f3d10a
9 changed files with 326 additions and 6 deletions
+1
View File
@@ -18,6 +18,7 @@ dependencies = [
"pydantic>=2.10.5", "pydantic>=2.10.5",
"qrcode[pil]>=8.2", "qrcode[pil]>=8.2",
"pycryptodome>=3.23.0", "pycryptodome>=3.23.0",
"json5>=0.13.0",
] ]
[dependency-groups] [dependency-groups]
@@ -9,3 +9,5 @@ class AdminUsersSG(StatesGroup):
users_list = State() users_list = State()
users_input = State() users_input = State()
user_detail = State() user_detail = State()
user_stats = State()
user_result_detail = State()
@@ -8,6 +8,9 @@ from dishka.integrations.aiogram_dialog import inject
from trudex.application.bot.admin_dialogs.states import AdminUsersSG from trudex.application.bot.admin_dialogs.states import AdminUsersSG
from trudex.infrastructure.database.dao.user import UserDAO from trudex.infrastructure.database.dao.user import UserDAO
from trudex.infrastructure.database.repo.test import TestRepository
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
from trudex.infrastructure.utils.timezone import to_msk
@inject @inject
@@ -84,6 +87,125 @@ async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: Di
await manager.done() await manager.done()
async def on_user_stats_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.switch_to(AdminUsersSG.user_stats)
@inject
async def get_user_stats_data(
dialog_manager: DialogManager,
user_dao: FromDishka[UserDAO],
attempt_repo: FromDishka[TestAttemptRepository],
**_kwargs,
):
user_id = dialog_manager.dialog_data.get("selected_user_id")
if not user_id:
return {"stats_info": "Пользователь не выбран", "results": [], "count": 0}
user = await user_dao.get_by_id(user_id)
if not user:
return {"stats_info": "Пользователь не найден", "results": [], "count": 0}
stats = await attempt_repo.get_user_stats(user_id)
attempts_with_tests = await attempt_repo.get_finished_attempts_with_tests(user_id)
name = user.name or user.first_name
if stats["total_attempts"] > 0:
accuracy_str = f"📊 Средняя точность: <b>{stats['avg_score']}%</b>"
tests_str = f"📝 Пройдено тестов: <b>{stats['total_attempts']}</b>"
else:
accuracy_str = "📊 Средняя точность: <b>—</b>"
tests_str = "📝 Пройдено тестов: <b>0</b>"
stats_info = (
f"<b>📊 Статистика: {name}</b>\n\n"
f"{tests_str}\n"
f"{accuracy_str}"
)
results = []
for attempt, test_title in attempts_with_tests:
status = "" if attempt.is_passed else ""
finished_at_msk = to_msk(attempt.finished_at)
date_str = finished_at_msk.strftime("%d.%m.%Y") if finished_at_msk else ""
results.append((f"{status} {test_title}{attempt.score}% ({date_str})", attempt.id))
return {
"stats_info": stats_info,
"results": results,
"count": len(results),
}
async def on_result_selected(_callback: CallbackQuery, _widget: Select, manager: DialogManager, item_id: str):
manager.dialog_data["selected_attempt_id"] = int(item_id)
await manager.switch_to(AdminUsersSG.user_result_detail)
@inject
async def get_user_result_detail(
dialog_manager: DialogManager,
attempt_repo: FromDishka[TestAttemptRepository],
test_repo: FromDishka[TestRepository],
**_kwargs
):
attempt_id = dialog_manager.dialog_data.get("selected_attempt_id")
if not attempt_id:
return {"result_info": "❌ Результат не найден"}
attempt, answers = await attempt_repo.get_attempt_with_answers(attempt_id)
if not attempt:
return {"result_info": "❌ Результат не найден"}
test, _ = await test_repo.get_test_with_questions(attempt.test_id)
test_title = test.title if test else "Неизвестный тест"
status = "✅ Пройден" if attempt.is_passed else "❌ Не пройден"
finished_at_msk = to_msk(attempt.finished_at)
date_str = finished_at_msk.strftime("%d.%m.%Y %H:%M") if finished_at_msk else ""
correct_count = sum(1 for a in answers if a.is_correct)
total_count = len(answers)
lines = [
f"<b>📝 {test_title}</b>\n",
f"📊 <b>Результат:</b> {attempt.score}%",
f"✏️ <b>Правильных ответов:</b> {correct_count} из {total_count}",
f"📅 <b>Дата:</b> {date_str}",
f"🏆 <b>Статус:</b> {status}",
"\n<b>📋 Ответы:</b>\n",
]
# Загружаем все вопросы за один запрос
question_ids = [answer.question_id for answer in answers]
questions_map = await test_repo.get_questions_with_options_by_ids(question_ids)
for i, answer in enumerate(answers, 1):
question_data = questions_map.get(answer.question_id)
if not question_data:
continue
question, options = question_data
correct_options = [opt for opt in options if opt.is_correct]
correct_texts = [opt.text for opt in correct_options]
status_icon = "" if answer.is_correct else ""
user_answer = answer.text_answer or ""
if "|" in user_answer:
user_answer = ", ".join(user_answer.split("|"))
lines.append(f"{status_icon} <b>Вопрос {i}</b>")
lines.append(f"<blockquote>{question.text}</blockquote>")
lines.append(f"👤 <i>Ответ:</i> {user_answer or ''}")
lines.append(f"✓ <i>Правильно:</i> {', '.join(correct_texts)}\n")
return {"result_info": "\n".join(lines)}
admin_users_dialog = Dialog( admin_users_dialog = Dialog(
Window( Window(
Format("<b>👥 Пользователи</b>\n\nВсего: {count}"), Format("<b>👥 Пользователи</b>\n\nВсего: {count}"),
@@ -114,8 +236,35 @@ admin_users_dialog = Dialog(
), ),
Window( Window(
Format("{user_info}"), Format("{user_info}"),
Column(
Button(Const("📊 Статистика"), id="stats", on_click=on_user_stats_clicked),
SwitchTo(Const("◀️ Назад"), id="back_to_list", state=AdminUsersSG.users_list), SwitchTo(Const("◀️ Назад"), id="back_to_list", state=AdminUsersSG.users_list),
),
state=AdminUsersSG.user_detail, state=AdminUsersSG.user_detail,
getter=get_user_detail_data, getter=get_user_detail_data,
), ),
Window(
Format("{stats_info}"),
ScrollingGroup(
Select(
Format("{item[0]}"),
id="result_select",
item_id_getter=lambda x: x[1],
items="results",
on_click=on_result_selected,
),
id="results_scroll",
width=1,
height=5,
),
SwitchTo(Const("◀️ Назад"), id="back_to_detail", state=AdminUsersSG.user_detail),
state=AdminUsersSG.user_stats,
getter=get_user_stats_data,
),
Window(
Format("{result_info}"),
SwitchTo(Const("◀️ Назад"), id="back_to_stats", state=AdminUsersSG.user_stats),
state=AdminUsersSG.user_result_detail,
getter=get_user_result_detail,
),
) )
@@ -9,5 +9,7 @@ class CreatorUsersSG(StatesGroup):
users_list = State() users_list = State()
users_input = State() users_input = State()
user_detail = State() user_detail = State()
user_stats = State()
user_result_detail = State()
make_admin_confirm = State() make_admin_confirm = State()
remove_admin_confirm = State() remove_admin_confirm = State()
@@ -11,9 +11,12 @@ from dishka.integrations.aiogram_dialog import inject
from trudex.application.bot.creator_dialogs.states import CreatorUsersSG from trudex.application.bot.creator_dialogs.states import CreatorUsersSG
from trudex.infrastructure.database.dao.user import UserDAO from trudex.infrastructure.database.dao.user import UserDAO
from trudex.infrastructure.database.repo.test import TestRepository
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
from trudex.infrastructure.database.repo.user import UserRepository from trudex.infrastructure.database.repo.user import UserRepository
from trudex.infrastructure.utils.bot_commands import setup_bot_commands from trudex.infrastructure.utils.bot_commands import setup_bot_commands
from trudex.infrastructure.utils.config import Config from trudex.infrastructure.utils.config import Config
from trudex.infrastructure.utils.timezone import to_msk
@inject @inject
@@ -208,6 +211,125 @@ async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: Di
await manager.done() await manager.done()
async def on_user_stats_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager):
await manager.switch_to(CreatorUsersSG.user_stats)
@inject
async def get_user_stats_data(
dialog_manager: DialogManager,
user_dao: FromDishka[UserDAO],
attempt_repo: FromDishka[TestAttemptRepository],
**_kwargs,
):
user_id = dialog_manager.dialog_data.get("selected_user_id")
if not user_id:
return {"stats_info": "Пользователь не выбран", "results": [], "count": 0}
user = await user_dao.get_by_id(user_id)
if not user:
return {"stats_info": "Пользователь не найден", "results": [], "count": 0}
stats = await attempt_repo.get_user_stats(user_id)
attempts_with_tests = await attempt_repo.get_finished_attempts_with_tests(user_id)
name = user.name or user.first_name
if stats["total_attempts"] > 0:
accuracy_str = f"📊 Средняя точность: <b>{stats['avg_score']}%</b>"
tests_str = f"📝 Пройдено тестов: <b>{stats['total_attempts']}</b>"
else:
accuracy_str = "📊 Средняя точность: <b>—</b>"
tests_str = "📝 Пройдено тестов: <b>0</b>"
stats_info = (
f"<b>📊 Статистика: {name}</b>\n\n"
f"{tests_str}\n"
f"{accuracy_str}"
)
results = []
for attempt, test_title in attempts_with_tests:
status = "" if attempt.is_passed else ""
finished_at_msk = to_msk(attempt.finished_at)
date_str = finished_at_msk.strftime("%d.%m.%Y") if finished_at_msk else ""
results.append((f"{status} {test_title}{attempt.score}% ({date_str})", attempt.id))
return {
"stats_info": stats_info,
"results": results,
"count": len(results),
}
async def on_result_selected(_callback: CallbackQuery, _widget: Select, manager: DialogManager, item_id: str):
manager.dialog_data["selected_attempt_id"] = int(item_id)
await manager.switch_to(CreatorUsersSG.user_result_detail)
@inject
async def get_user_result_detail(
dialog_manager: DialogManager,
attempt_repo: FromDishka[TestAttemptRepository],
test_repo: FromDishka[TestRepository],
**_kwargs
):
attempt_id = dialog_manager.dialog_data.get("selected_attempt_id")
if not attempt_id:
return {"result_info": "❌ Результат не найден"}
attempt, answers = await attempt_repo.get_attempt_with_answers(attempt_id)
if not attempt:
return {"result_info": "❌ Результат не найден"}
test, _ = await test_repo.get_test_with_questions(attempt.test_id)
test_title = test.title if test else "Неизвестный тест"
status = "✅ Пройден" if attempt.is_passed else "❌ Не пройден"
finished_at_msk = to_msk(attempt.finished_at)
date_str = finished_at_msk.strftime("%d.%m.%Y %H:%M") if finished_at_msk else ""
correct_count = sum(1 for a in answers if a.is_correct)
total_count = len(answers)
lines = [
f"<b>📝 {test_title}</b>\n",
f"📊 <b>Результат:</b> {attempt.score}%",
f"✏️ <b>Правильных ответов:</b> {correct_count} из {total_count}",
f"📅 <b>Дата:</b> {date_str}",
f"🏆 <b>Статус:</b> {status}",
"\n<b>📋 Ответы:</b>\n",
]
# Загружаем все вопросы за один запрос
question_ids = [answer.question_id for answer in answers]
questions_map = await test_repo.get_questions_with_options_by_ids(question_ids)
for i, answer in enumerate(answers, 1):
question_data = questions_map.get(answer.question_id)
if not question_data:
continue
question, options = question_data
correct_options = [opt for opt in options if opt.is_correct]
correct_texts = [opt.text for opt in correct_options]
status_icon = "" if answer.is_correct else ""
user_answer = answer.text_answer or ""
if "|" in user_answer:
user_answer = ", ".join(user_answer.split("|"))
lines.append(f"{status_icon} <b>Вопрос {i}</b>")
lines.append(f"<blockquote>{question.text}</blockquote>")
lines.append(f"👤 <i>Ответ:</i> {user_answer or ''}")
lines.append(f"✓ <i>Правильно:</i> {', '.join(correct_texts)}\n")
return {"result_info": "\n".join(lines)}
creator_users_dialog = Dialog( creator_users_dialog = Dialog(
Window( Window(
Format("<b>👥 Пользователи</b>\n\nВсего: {count}"), Format("<b>👥 Пользователи</b>\n\nВсего: {count}"),
@@ -239,6 +361,7 @@ creator_users_dialog = Dialog(
Window( Window(
Format("{user_info}"), Format("{user_info}"),
Column( Column(
Button(Const("📊 Статистика"), id="stats", on_click=on_user_stats_clicked),
Button(Const("👑 Сделать администратором"), id="make_admin", on_click=on_make_admin_clicked, when="show_make_admin"), Button(Const("👑 Сделать администратором"), id="make_admin", on_click=on_make_admin_clicked, when="show_make_admin"),
Button(Const("🚫 Снять администратора"), id="remove_admin", on_click=on_remove_admin_clicked, when="show_remove_admin"), Button(Const("🚫 Снять администратора"), id="remove_admin", on_click=on_remove_admin_clicked, when="show_remove_admin"),
SwitchTo(Const("◀️ Назад"), id="back_to_list", state=CreatorUsersSG.users_list), SwitchTo(Const("◀️ Назад"), id="back_to_list", state=CreatorUsersSG.users_list),
@@ -246,6 +369,30 @@ creator_users_dialog = Dialog(
state=CreatorUsersSG.user_detail, state=CreatorUsersSG.user_detail,
getter=get_user_detail_data, getter=get_user_detail_data,
), ),
Window(
Format("{stats_info}"),
ScrollingGroup(
Select(
Format("{item[0]}"),
id="result_select",
item_id_getter=lambda x: x[1],
items="results",
on_click=on_result_selected,
),
id="results_scroll",
width=1,
height=5,
),
SwitchTo(Const("◀️ Назад"), id="back_to_detail", state=CreatorUsersSG.user_detail),
state=CreatorUsersSG.user_stats,
getter=get_user_stats_data,
),
Window(
Format("{result_info}"),
SwitchTo(Const("◀️ Назад"), id="back_to_stats", state=CreatorUsersSG.user_stats),
state=CreatorUsersSG.user_result_detail,
getter=get_user_result_detail,
),
Window( Window(
Format("{confirm_text}"), Format("{confirm_text}"),
Row( Row(
@@ -12,6 +12,7 @@ from dishka.integrations.aiogram_dialog import inject
from trudex.application.bot.shared_dialogs.states import SharedTemplatesSG, SharedTestsSG from trudex.application.bot.shared_dialogs.states import SharedTemplatesSG, SharedTestsSG
from trudex.domain.schemas import QuestionType from trudex.domain.schemas import QuestionType
from trudex.domain.test_parser import ParsedTest, TestParser from trudex.domain.test_parser import ParsedTest, TestParser
from trudex.infrastructure.database.dao.group import GroupDAO
from trudex.infrastructure.database.dao.option import OptionDAO from trudex.infrastructure.database.dao.option import OptionDAO
from trudex.infrastructure.database.dao.question import QuestionDAO from trudex.infrastructure.database.dao.question import QuestionDAO
from trudex.infrastructure.database.dao.test import TestDAO from trudex.infrastructure.database.dao.test import TestDAO
@@ -330,6 +331,7 @@ async def on_import_file(
test_dao: FromDishka[TestDAO], test_dao: FromDishka[TestDAO],
question_dao: FromDishka[QuestionDAO], question_dao: FromDishka[QuestionDAO],
option_dao: FromDishka[OptionDAO], option_dao: FromDishka[OptionDAO],
group_dao: FromDishka[GroupDAO],
) -> None: ) -> None:
if not message.document: if not message.document:
await message.answer("❌ Отправьте JSON файл") await message.answer("❌ Отправьте JSON файл")
@@ -373,6 +375,13 @@ async def on_import_file(
await progress_msg.edit_text("\n".join(error_lines)) await progress_msg.edit_text("\n".join(error_lines))
return return
# Проверяем существование группы
if result.for_group is not None:
group = await group_dao.get_by_number(result.for_group)
if not group:
await progress_msg.edit_text(f"❌ Группа {result.for_group} не существует")
return
await create_test_from_parsed(result, test_dao, question_dao, option_dao) await create_test_from_parsed(result, test_dao, question_dao, option_dao)
await progress_msg.edit_text( await progress_msg.edit_text(
+4 -4
View File
@@ -1,4 +1,4 @@
import json import json5
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
@@ -39,9 +39,9 @@ class TestParser:
def parse(self, json_str: str) -> ParsedTest | list[ParseError]: def parse(self, json_str: str) -> ParsedTest | list[ParseError]:
try: try:
data = json.loads(json_str) data = json5.loads(json_str)
except json.JSONDecodeError as e: except ValueError as e:
return [ParseError(f"Невалидный JSON: {e.msg}", path=None)] return [ParseError(f"Невалидный JSON: {e}", path=None)]
if not isinstance(data, dict): if not isinstance(data, dict):
return [ParseError("JSON должен быть объектом", path=None)] return [ParseError("JSON должен быть объектом", path=None)]
@@ -10,7 +10,6 @@ from trudex.domain.schemas import QuestionType
class Base(DeclarativeBase): class Base(DeclarativeBase):
pass pass
@final @final
class User(Base): class User(Base):
__tablename__ = "users" __tablename__ = "users"
Generated
+11
View File
@@ -380,6 +380,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" }, { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" },
] ]
[[package]]
name = "json5"
version = "0.13.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/77/e8/a3f261a66e4663f22700bc8a17c08cb83e91fbf086726e7a228398968981/json5-0.13.0.tar.gz", hash = "sha256:b1edf8d487721c0bf64d83c28e91280781f6e21f4a797d3261c7c828d4c165bf", size = 52441, upload-time = "2026-01-01T19:42:14.99Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d7/9e/038522f50ceb7e74f1f991bf1b699f24b0c2bbe7c390dd36ad69f4582258/json5-0.13.0-py3-none-any.whl", hash = "sha256:9a08e1dd65f6a4d4c6fa82d216cf2477349ec2346a38fd70cc11d2557499fbcc", size = 36163, upload-time = "2026-01-01T19:42:13.962Z" },
]
[[package]] [[package]]
name = "magic-filter" name = "magic-filter"
version = "1.0.12" version = "1.0.12"
@@ -773,6 +782,7 @@ dependencies = [
{ name = "asyncpg" }, { name = "asyncpg" },
{ name = "dishka" }, { name = "dishka" },
{ name = "httpx" }, { name = "httpx" },
{ name = "json5" },
{ name = "pycryptodome" }, { name = "pycryptodome" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "qrcode", extra = ["pil"] }, { name = "qrcode", extra = ["pil"] },
@@ -795,6 +805,7 @@ requires-dist = [
{ name = "asyncpg", specifier = ">=0.31.0" }, { name = "asyncpg", specifier = ">=0.31.0" },
{ name = "dishka", specifier = ">=1.7.2" }, { name = "dishka", specifier = ">=1.7.2" },
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "json5", specifier = ">=0.13.0" },
{ name = "pycryptodome", specifier = ">=3.23.0" }, { name = "pycryptodome", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.10.5" }, { name = "pydantic", specifier = ">=2.10.5" },
{ name = "qrcode", extras = ["pil"], specifier = ">=8.2" }, { name = "qrcode", extras = ["pil"], specifier = ">=8.2" },