This commit is contained in:
2026-01-04 02:15:17 +03:00
parent 0ebf915ec4
commit cd391254f1
5 changed files with 547 additions and 6 deletions
@@ -9,6 +9,7 @@ class AdminTemplatesSG(StatesGroup):
main = State()
export_list = State()
spec = State()
import_file = State()
class AdminUsersSG(StatesGroup):
@@ -1,13 +1,18 @@
import json
from aiogram.types import BufferedInputFile, CallbackQuery
from aiogram import Bot
from aiogram.types import BufferedInputFile, CallbackQuery, ContentType, Message
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
from aiogram_dialog.widgets.input import MessageInput
from aiogram_dialog.widgets.kbd import Button, Row, ScrollingGroup, Select
from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject
from trudex.application.bot.admin_dialogs.states import AdminMenuSG, AdminTemplatesSG
from trudex.domain.test_parser import ParsedTest, TestParser
from trudex.infrastructure.database.dao.option import OptionDAO
from trudex.infrastructure.database.dao.question import QuestionDAO
from trudex.infrastructure.database.dao.test import TestDAO
from trudex.infrastructure.database.repo.test import TestRepository
@@ -164,8 +169,8 @@ async def on_export_clicked(_callback: CallbackQuery, _button: Button, manager:
await manager.switch_to(AdminTemplatesSG.export_list)
async def on_import_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager) -> None:
await _callback.answer("🚧 В разработке", show_alert=True)
async def on_import_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
await manager.switch_to(AdminTemplatesSG.import_file)
async def on_spec_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
@@ -273,6 +278,98 @@ async def on_template_full(_callback: CallbackQuery, _button: Button, _manager:
await send_template(_callback, TEMPLATE_FULL, "full")
async def create_test_from_parsed(
parsed: ParsedTest,
test_dao: TestDAO,
question_dao: QuestionDAO,
option_dao: OptionDAO,
) -> int:
test = await test_dao.create(
title=parsed.title,
description=parsed.description,
password=parsed.password,
attempts=parsed.attempts,
expires_at=parsed.expires_at,
for_group=parsed.for_group,
)
for position, q in enumerate(parsed.questions):
question = await question_dao.create(
test_id=test.id,
text=q.text,
position=position,
question_type=q.question_type,
)
for opt in q.options:
await option_dao.create(
question_id=question.id,
text=opt.text,
is_correct=opt.is_correct,
)
return test.id
@inject
async def on_import_file(
message: Message,
_widget: MessageInput,
manager: DialogManager,
bot_inst: FromDishka[Bot],
test_dao: FromDishka[TestDAO],
question_dao: FromDishka[QuestionDAO],
option_dao: FromDishka[OptionDAO],
) -> None:
if not message.document:
await message.answer("❌ Отправьте JSON файл")
return
if message.document.file_size and message.document.file_size > 1024 * 1024:
await message.answer("❌ Файл слишком большой (максимум 1 МБ)")
return
file = await bot_inst.get_file(message.document.file_id)
if not file.file_path:
await message.answer("❌ Не удалось загрузить файл")
return
file_bytes = await bot_inst.download_file(file.file_path)
if not file_bytes:
await message.answer("❌ Не удалось загрузить файл")
return
try:
json_str = file_bytes.read().decode("utf-8")
except UnicodeDecodeError:
await message.answer("❌ Файл должен быть в кодировке UTF-8")
return
parser = TestParser()
result = parser.parse(json_str)
if isinstance(result, list):
error_lines = ["❌ <b>Ошибки валидации:</b>\n"]
for err in result[:10]:
path_str = f" (<code>{err.path}</code>)" if err.path else ""
error_lines.append(f"{err.message}{path_str}")
if len(result) > 10:
error_lines.append(f"\n... и ещё {len(result) - 10} ошибок")
await message.answer("\n".join(error_lines))
return
test_id = await create_test_from_parsed(result, test_dao, question_dao, option_dao)
await message.answer(
f"✅ <b>Тест импортирован!</b>\n\n"
f"📝 <b>Название:</b> {result.title}\n"
f"❓ <b>Вопросов:</b> {len(result.questions)}\n\n"
f"Тест создан в деактивированном состоянии."
)
await manager.switch_to(AdminTemplatesSG.main)
templates_dialog = Dialog(
Window(
Const(TEMPLATES_INFO),
@@ -315,4 +412,10 @@ templates_dialog = Dialog(
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_templates),
state=AdminTemplatesSG.spec,
),
Window(
Const("<b>📥 Импорт теста</b>\n\nОтправьте JSON файл с тестом.\n\n<i>Формат файла описан в разделе «Спецификация»</i>"),
MessageInput(on_import_file, content_types=[ContentType.DOCUMENT]),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_templates),
state=AdminTemplatesSG.import_file,
),
)
@@ -9,6 +9,7 @@ class CreatorTemplatesSG(StatesGroup):
main = State()
export_list = State()
spec = State()
import_file = State()
class CreatorUsersSG(StatesGroup):
@@ -1,13 +1,18 @@
import json
from aiogram.types import BufferedInputFile, CallbackQuery
from aiogram import Bot
from aiogram.types import BufferedInputFile, CallbackQuery, ContentType, Message
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
from aiogram_dialog.widgets.input import MessageInput
from aiogram_dialog.widgets.kbd import Button, Row, ScrollingGroup, Select
from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka
from dishka.integrations.aiogram_dialog import inject
from trudex.application.bot.creator_dialogs.states import CreatorMenuSG, CreatorTemplatesSG
from trudex.domain.test_parser import ParsedTest, TestParser
from trudex.infrastructure.database.dao.option import OptionDAO
from trudex.infrastructure.database.dao.question import QuestionDAO
from trudex.infrastructure.database.dao.test import TestDAO
from trudex.infrastructure.database.repo.test import TestRepository
@@ -164,8 +169,8 @@ async def on_export_clicked(_callback: CallbackQuery, _button: Button, manager:
await manager.switch_to(CreatorTemplatesSG.export_list)
async def on_import_clicked(_callback: CallbackQuery, _button: Button, _manager: DialogManager) -> None:
await _callback.answer("🚧 В разработке", show_alert=True)
async def on_import_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
await manager.switch_to(CreatorTemplatesSG.import_file)
async def on_spec_clicked(_callback: CallbackQuery, _button: Button, manager: DialogManager) -> None:
@@ -273,6 +278,98 @@ async def on_template_full(_callback: CallbackQuery, _button: Button, _manager:
await send_template(_callback, TEMPLATE_FULL, "full")
async def create_test_from_parsed(
parsed: ParsedTest,
test_dao: TestDAO,
question_dao: QuestionDAO,
option_dao: OptionDAO,
) -> int:
test = await test_dao.create(
title=parsed.title,
description=parsed.description,
password=parsed.password,
attempts=parsed.attempts,
expires_at=parsed.expires_at,
for_group=parsed.for_group,
)
for position, q in enumerate(parsed.questions):
question = await question_dao.create(
test_id=test.id,
text=q.text,
position=position,
question_type=q.question_type,
)
for opt in q.options:
await option_dao.create(
question_id=question.id,
text=opt.text,
is_correct=opt.is_correct,
)
return test.id
@inject
async def on_import_file(
message: Message,
_widget: MessageInput,
manager: DialogManager,
bot_inst: FromDishka[Bot],
test_dao: FromDishka[TestDAO],
question_dao: FromDishka[QuestionDAO],
option_dao: FromDishka[OptionDAO],
) -> None:
if not message.document:
await message.answer("❌ Отправьте JSON файл")
return
if message.document.file_size and message.document.file_size > 1024 * 1024:
await message.answer("❌ Файл слишком большой (максимум 1 МБ)")
return
file = await bot_inst.get_file(message.document.file_id)
if not file.file_path:
await message.answer("❌ Не удалось загрузить файл")
return
file_bytes = await bot_inst.download_file(file.file_path)
if not file_bytes:
await message.answer("❌ Не удалось загрузить файл")
return
try:
json_str = file_bytes.read().decode("utf-8")
except UnicodeDecodeError:
await message.answer("❌ Файл должен быть в кодировке UTF-8")
return
parser = TestParser()
result = parser.parse(json_str)
if isinstance(result, list):
error_lines = ["❌ <b>Ошибки валидации:</b>\n"]
for err in result[:10]:
path_str = f" (<code>{err.path}</code>)" if err.path else ""
error_lines.append(f"{err.message}{path_str}")
if len(result) > 10:
error_lines.append(f"\n... и ещё {len(result) - 10} ошибок")
await message.answer("\n".join(error_lines))
return
test_id = await create_test_from_parsed(result, test_dao, question_dao, option_dao)
await message.answer(
f"✅ <b>Тест импортирован!</b>\n\n"
f"📝 <b>Название:</b> {result.title}\n"
f"❓ <b>Вопросов:</b> {len(result.questions)}\n\n"
f"Тест создан в деактивированном состоянии."
)
await manager.switch_to(CreatorTemplatesSG.main)
templates_dialog = Dialog(
Window(
Const(TEMPLATES_INFO),
@@ -315,4 +412,10 @@ templates_dialog = Dialog(
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_templates),
state=CreatorTemplatesSG.spec,
),
Window(
Const("<b>📥 Импорт теста</b>\n\nОтправьте JSON файл с тестом.\n\n<i>Формат файла описан в разделе «Спецификация»</i>"),
MessageInput(on_import_file, content_types=[ContentType.DOCUMENT]),
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_templates),
state=CreatorTemplatesSG.import_file,
),
)
+333
View File
@@ -0,0 +1,333 @@
import json
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ParsedOption:
text: str
is_correct: bool
@dataclass
class ParsedQuestion:
text: str
question_type: str
options: list[ParsedOption]
correct_answer: str | None = None
@dataclass
class ParsedTest:
title: str
description: str | None
password: str | None
attempts: int | None
expires_at: datetime | None
for_group: int | None
questions: list[ParsedQuestion]
@dataclass
class ParseError:
message: str
path: str | None = None
class TestParser:
VALID_QUESTION_TYPES = {"single", "multiple", "input"}
def parse(self, json_str: str) -> ParsedTest | list[ParseError]:
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
return [ParseError(f"Невалидный JSON: {e.msg}", path=None)]
if not isinstance(data, dict):
return [ParseError("JSON должен быть объектом", path=None)]
errors: list[ParseError] = []
title = self._parse_string(data, "title", required=True, max_length=255, errors=errors)
description = self._parse_string(data, "description", required=False, max_length=2000, errors=errors)
password = self._parse_string(data, "password", required=False, max_length=255, errors=errors)
attempts = self._parse_int(data, "attempts", required=False, min_val=1, max_val=100, errors=errors)
expires_at = self._parse_datetime(data, "expires_at", required=False, errors=errors)
for_group = self._parse_int(data, "for_group", required=False, errors=errors)
questions = self._parse_questions(data, errors)
if errors:
return errors
assert title is not None
return ParsedTest(
title=title,
description=description,
password=password,
attempts=attempts,
expires_at=expires_at,
for_group=for_group,
questions=questions,
)
def _parse_string(
self,
data: dict,
key: str,
required: bool,
max_length: int | None = None,
errors: list[ParseError] | None = None,
) -> str | None:
errors = errors or []
value = data.get(key)
if value is None:
if required:
errors.append(ParseError(f"Поле '{key}' обязательно", path=key))
return None
if not isinstance(value, str):
errors.append(ParseError(f"Поле '{key}' должно быть строкой", path=key))
return None
value = value.strip()
if not value and required:
errors.append(ParseError(f"Поле '{key}' не может быть пустым", path=key))
return None
if max_length and len(value) > max_length:
errors.append(ParseError(f"Поле '{key}' слишком длинное (максимум {max_length})", path=key))
return None
return value if value else None
def _parse_int(
self,
data: dict,
key: str,
required: bool,
min_val: int | None = None,
max_val: int | None = None,
errors: list[ParseError] | None = None,
) -> int | None:
errors = errors or []
value = data.get(key)
if value is None:
if required:
errors.append(ParseError(f"Поле '{key}' обязательно", path=key))
return None
if not isinstance(value, int) or isinstance(value, bool):
errors.append(ParseError(f"Поле '{key}' должно быть числом", path=key))
return None
if min_val is not None and value < min_val:
errors.append(ParseError(f"Поле '{key}' должно быть >= {min_val}", path=key))
return None
if max_val is not None and value > max_val:
errors.append(ParseError(f"Поле '{key}' должно быть <= {max_val}", path=key))
return None
return value
def _parse_datetime(
self,
data: dict,
key: str,
required: bool,
errors: list[ParseError] | None = None,
) -> datetime | None:
errors = errors or []
value = data.get(key)
if value is None:
if required:
errors.append(ParseError(f"Поле '{key}' обязательно", path=key))
return None
if not isinstance(value, str):
errors.append(ParseError(f"Поле '{key}' должно быть строкой в ISO формате", path=key))
return None
try:
return datetime.fromisoformat(value)
except ValueError:
errors.append(ParseError(f"Поле '{key}' должно быть в ISO формате (например 2026-12-31T23:59:59)", path=key))
return None
def _parse_questions(self, data: dict, errors: list[ParseError]) -> list[ParsedQuestion]:
questions_data = data.get("questions")
if questions_data is None:
errors.append(ParseError("Поле 'questions' обязательно", path="questions"))
return []
if not isinstance(questions_data, list):
errors.append(ParseError("Поле 'questions' должно быть массивом", path="questions"))
return []
if len(questions_data) == 0:
errors.append(ParseError("Тест должен содержать хотя бы один вопрос", path="questions"))
return []
questions: list[ParsedQuestion] = []
for i, q_data in enumerate(questions_data):
path = f"questions[{i}]"
if not isinstance(q_data, dict):
errors.append(ParseError("Вопрос должен быть объектом", path=path))
continue
question = self._parse_question(q_data, path, errors)
if question:
questions.append(question)
return questions
def _parse_question(self, data: dict, path: str, errors: list[ParseError]) -> ParsedQuestion | None:
text = data.get("text")
if not text or not isinstance(text, str):
errors.append(ParseError("Поле 'text' обязательно и должно быть строкой", path=f"{path}.text"))
return None
text = text.strip()
if not text:
errors.append(ParseError("Текст вопроса не может быть пустым", path=f"{path}.text"))
return None
if len(text) > 2000:
errors.append(ParseError("Текст вопроса слишком длинный (максимум 2000)", path=f"{path}.text"))
return None
question_type = data.get("question_type")
if not question_type or not isinstance(question_type, str):
errors.append(ParseError("Поле 'question_type' обязательно", path=f"{path}.question_type"))
return None
if question_type not in self.VALID_QUESTION_TYPES:
errors.append(ParseError(
f"Неизвестный тип вопроса '{question_type}'. Допустимые: single, multiple, input",
path=f"{path}.question_type"
))
return None
if question_type == "input":
return self._parse_input_question(data, path, text, errors)
else:
return self._parse_choice_question(data, path, text, question_type, errors)
def _parse_input_question(
self,
data: dict,
path: str,
text: str,
errors: list[ParseError],
) -> ParsedQuestion | None:
correct_answer = data.get("correct_answer")
if not correct_answer or not isinstance(correct_answer, str):
errors.append(ParseError(
"Для типа 'input' поле 'correct_answer' обязательно",
path=f"{path}.correct_answer"
))
return None
correct_answer = correct_answer.strip()
if not correct_answer:
errors.append(ParseError("Правильный ответ не может быть пустым", path=f"{path}.correct_answer"))
return None
if len(correct_answer) > 255:
errors.append(ParseError("Правильный ответ слишком длинный (максимум 255)", path=f"{path}.correct_answer"))
return None
return ParsedQuestion(
text=text,
question_type="input",
options=[ParsedOption(text=correct_answer, is_correct=True)],
correct_answer=correct_answer,
)
def _parse_choice_question(
self,
data: dict,
path: str,
text: str,
question_type: str,
errors: list[ParseError],
) -> ParsedQuestion | None:
options_data = data.get("options")
if not options_data or not isinstance(options_data, list):
errors.append(ParseError(
f"Для типа '{question_type}' поле 'options' обязательно и должно быть массивом",
path=f"{path}.options"
))
return None
if len(options_data) < 2:
errors.append(ParseError("Минимум 2 варианта ответа", path=f"{path}.options"))
return None
if len(options_data) > 10:
errors.append(ParseError("Максимум 10 вариантов ответа", path=f"{path}.options"))
return None
options: list[ParsedOption] = []
correct_count = 0
for j, opt_data in enumerate(options_data):
opt_path = f"{path}.options[{j}]"
if not isinstance(opt_data, dict):
errors.append(ParseError("Вариант ответа должен быть объектом", path=opt_path))
continue
opt_text = opt_data.get("text")
if not opt_text or not isinstance(opt_text, str):
errors.append(ParseError("Поле 'text' обязательно", path=f"{opt_path}.text"))
continue
opt_text = opt_text.strip()
if not opt_text:
errors.append(ParseError("Текст варианта не может быть пустым", path=f"{opt_path}.text"))
continue
if len(opt_text) > 255:
errors.append(ParseError("Текст варианта слишком длинный (максимум 255)", path=f"{opt_path}.text"))
continue
is_correct = opt_data.get("is_correct")
if not isinstance(is_correct, bool):
errors.append(ParseError("Поле 'is_correct' должно быть true или false", path=f"{opt_path}.is_correct"))
continue
if is_correct:
correct_count += 1
options.append(ParsedOption(text=opt_text, is_correct=is_correct))
if len(options) < 2:
return None
if correct_count == 0:
errors.append(ParseError("Должен быть хотя бы один правильный ответ", path=f"{path}.options"))
return None
if question_type == "single" and correct_count > 1:
errors.append(ParseError(
f"Для типа 'single' должен быть ровно один правильный ответ (найдено {correct_count})",
path=f"{path}.options"
))
return None
return ParsedQuestion(
text=text,
question_type=question_type,
options=options,
)