mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 10:25:28 +03:00
commit
This commit is contained in:
@@ -55,7 +55,7 @@ async def on_description_input(message: Message, _widget: MessageInput, manager:
|
||||
|
||||
|
||||
@inject
|
||||
async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, group_dao: FromDishka[GroupDAO]):
|
||||
async def on_password_input(message: Message, _widget: MessageInput, manager: DialogManager, _group_dao: FromDishka[GroupDAO]):
|
||||
if not message.text:
|
||||
await message.answer("❌ Пароль не может быть пустым")
|
||||
return
|
||||
@@ -74,7 +74,7 @@ async def on_password_input(message: Message, _widget: MessageInput, manager: Di
|
||||
|
||||
|
||||
@inject
|
||||
async def on_skip_password(_callback: CallbackQuery, _button: Button, manager: DialogManager, group_dao: FromDishka[GroupDAO]):
|
||||
async def on_skip_password(_callback: CallbackQuery, _button: Button, manager: DialogManager, _group_dao: FromDishka[GroupDAO]):
|
||||
manager.dialog_data["password"] = None
|
||||
await manager.switch_to(CreateTestSG.input_attempts)
|
||||
|
||||
@@ -120,7 +120,7 @@ async def on_skip_expires(_callback: CallbackQuery, _button: Button, manager: Di
|
||||
|
||||
|
||||
@inject
|
||||
async def get_groups_for_test(dialog_manager: DialogManager, group_dao: FromDishka[GroupDAO], **_kwargs):
|
||||
async def get_groups_for_test(group_dao: FromDishka[GroupDAO], **_kwargs):
|
||||
groups = await group_dao.get_all()
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import functools
|
||||
from datetime import date, datetime
|
||||
import logging
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.enums import ContentType
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from aiogram import Router
|
||||
from aiogram.filters import Command, CommandStart, CommandObject
|
||||
@@ -93,7 +93,7 @@ async def validate_deeplink_test(
|
||||
if not test.is_active:
|
||||
return False, "❌ Тест деактивирован"
|
||||
|
||||
if test.expires_at and test.expires_at < datetime.utcnow():
|
||||
if test.expires_at and test.expires_at < datetime.now(timezone.utc):
|
||||
return False, "❌ Срок действия теста истек"
|
||||
|
||||
user = await user_dao.get_by_id(user_id)
|
||||
@@ -107,11 +107,11 @@ async def validate_deeplink_test(
|
||||
async def start_with_deeplink(
|
||||
message: Message,
|
||||
command: CommandObject,
|
||||
dialog_manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
group_dao: FromDishka[GroupDAO],
|
||||
test_dao: FromDishka[TestDAO],
|
||||
config: FromDishka[Config],
|
||||
dialog_manager: DialogManager,
|
||||
) -> None:
|
||||
assert message.from_user is not None
|
||||
|
||||
@@ -156,9 +156,9 @@ async def start_with_deeplink(
|
||||
@router.message(CommandStart())
|
||||
async def start_handler(
|
||||
message: Message,
|
||||
dialog_manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
group_dao: FromDishka[GroupDAO],
|
||||
dialog_manager: DialogManager
|
||||
) -> None:
|
||||
is_registered = await ensure_user_registered(
|
||||
user_dao, group_dao, message, dialog_manager
|
||||
@@ -169,12 +169,12 @@ async def start_handler(
|
||||
|
||||
|
||||
@router.message(Command("admin"))
|
||||
async def admin_command(message: Message, dialog_manager: DialogManager) -> None:
|
||||
async def admin_command(_message: Message, dialog_manager: DialogManager) -> None:
|
||||
await dialog_manager.start(AdminMenuSG.main, mode=StartMode.RESET_STACK)
|
||||
|
||||
|
||||
@router.message(Command("creator"))
|
||||
async def creator_command(message: Message, dialog_manager: DialogManager) -> None:
|
||||
async def creator_command(_message: Message, dialog_manager: DialogManager) -> None:
|
||||
await dialog_manager.start(CreatorMenuSG.main, mode=StartMode.RESET_STACK)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from datetime import datetime
|
||||
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
|
||||
from aiogram_dialog.widgets.input import MessageInput
|
||||
@@ -9,38 +7,23 @@ from dishka import FromDishka
|
||||
from dishka.integrations.aiogram_dialog import inject
|
||||
|
||||
from trudex.application.bot.user_dialogs.states import UserDeeplinkSG, UserMenuSG, UserTestSG
|
||||
from trudex.domain.schemas import Test, User
|
||||
from trudex.infrastructure.database.dao.test import TestDAO
|
||||
from trudex.infrastructure.database.models import QuestionType
|
||||
from trudex.infrastructure.database.repo.test import TestRepository
|
||||
from trudex.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||
|
||||
|
||||
async def validate_test_access(test: Test | None, user: User | None) -> tuple[bool, str]:
|
||||
if not test:
|
||||
return False, "❌ Тест не найден"
|
||||
|
||||
if not test.is_active:
|
||||
return False, "❌ Тест деактивирован"
|
||||
|
||||
if test.expires_at and test.expires_at < datetime.utcnow():
|
||||
return False, "❌ Срок действия теста истек"
|
||||
|
||||
if test.for_group and user and user.group != test.for_group:
|
||||
return False, f"❌ Тест доступен только для группы {test.for_group}"
|
||||
|
||||
return True, ""
|
||||
|
||||
|
||||
@inject
|
||||
async def get_deeplink_test_data(
|
||||
dialog_manager: DialogManager,
|
||||
test_dao: FromDishka[TestDAO],
|
||||
test_repo: FromDishka[TestRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
test_id = dialog_manager.start_data.get("test_id") if dialog_manager.start_data else None
|
||||
error = dialog_manager.start_data.get("error") if dialog_manager.start_data else None
|
||||
start_data = dialog_manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
test_id = start_data.get("test_id")
|
||||
error = start_data.get("error")
|
||||
|
||||
if error:
|
||||
return {"test_info": error, "can_start": False}
|
||||
@@ -78,7 +61,10 @@ async def on_start_deeplink_test(
|
||||
test_repo: FromDishka[TestRepository],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
):
|
||||
assert _callback.from_user is not None
|
||||
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
test_id = start_data.get("test_id")
|
||||
user_id = _callback.from_user.id
|
||||
|
||||
@@ -156,7 +142,10 @@ async def on_deeplink_password_input(
|
||||
test_repo: FromDishka[TestRepository],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
):
|
||||
assert message.from_user is not None
|
||||
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
test_id = start_data.get("test_id")
|
||||
|
||||
if not test_id:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, Window
|
||||
@@ -21,8 +21,9 @@ async def get_user_data(
|
||||
dialog_manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
assert dialog_manager.event.from_user is not None
|
||||
user_id = dialog_manager.event.from_user.id
|
||||
user = await user_dao.get_by_id(user_id)
|
||||
stats = await attempt_repo.get_user_stats(user_id)
|
||||
@@ -53,11 +54,11 @@ async def get_user_data(
|
||||
def can_edit_field(updated_at: datetime | None) -> bool:
|
||||
if updated_at is None:
|
||||
return True
|
||||
return datetime.utcnow() - updated_at >= timedelta(hours=24)
|
||||
return datetime.now(timezone.utc) - updated_at >= timedelta(hours=24)
|
||||
|
||||
|
||||
def get_remaining_time(updated_at: datetime) -> str:
|
||||
remaining = timedelta(hours=24) - (datetime.utcnow() - updated_at)
|
||||
remaining = timedelta(hours=24) - (datetime.now(timezone.utc) - updated_at)
|
||||
hours = int(remaining.total_seconds() // 3600)
|
||||
minutes = int((remaining.total_seconds() % 3600) // 60)
|
||||
return f"{hours}ч {minutes}м"
|
||||
@@ -68,14 +69,16 @@ async def on_edit_name_clicked(
|
||||
_callback: CallbackQuery,
|
||||
_button: Button,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
assert _callback.from_user is not None
|
||||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||||
if not user:
|
||||
await _callback.answer("❌ Пользователь не найден")
|
||||
return
|
||||
|
||||
if not can_edit_field(user.name_updated_at):
|
||||
assert user.name_updated_at is not None
|
||||
remaining = get_remaining_time(user.name_updated_at)
|
||||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||||
return
|
||||
@@ -88,14 +91,16 @@ async def on_edit_group_clicked(
|
||||
_callback: CallbackQuery,
|
||||
_button: Button,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
assert _callback.from_user is not None
|
||||
user = await user_dao.get_by_id(_callback.from_user.id)
|
||||
if not user:
|
||||
await _callback.answer("❌ Пользователь не найден")
|
||||
return
|
||||
|
||||
if not can_edit_field(user.group_updated_at):
|
||||
assert user.group_updated_at is not None
|
||||
remaining = get_remaining_time(user.group_updated_at)
|
||||
await _callback.answer(f"⏳ Изменить можно через {remaining}")
|
||||
return
|
||||
@@ -120,14 +125,15 @@ async def on_name_input(
|
||||
message: Message,
|
||||
_widget: MessageInput,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
assert message.from_user is not None
|
||||
if not message.text or len(message.text.strip()) < 2:
|
||||
await message.answer("❌ Имя должно содержать минимум 2 символа")
|
||||
return
|
||||
|
||||
name = message.text.strip()[:128]
|
||||
await user_dao.update(message.from_user.id, name=name, name_updated_at=datetime.utcnow())
|
||||
await user_dao.update(message.from_user.id, name=name, name_updated_at=datetime.now(timezone.utc))
|
||||
await message.answer("✅ Имя обновлено")
|
||||
await manager.switch_to(UserMenuSG.main)
|
||||
|
||||
@@ -144,9 +150,10 @@ async def on_group_selected(
|
||||
_widget,
|
||||
manager: DialogManager,
|
||||
item_id: str,
|
||||
user_dao: FromDishka[UserDAO]
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
await user_dao.update(_callback.from_user.id, group=int(item_id), group_updated_at=datetime.utcnow())
|
||||
assert _callback.from_user is not None
|
||||
await user_dao.update(_callback.from_user.id, group=int(item_id), group_updated_at=datetime.now(timezone.utc))
|
||||
await _callback.answer("✅ Группа обновлена")
|
||||
await manager.switch_to(UserMenuSG.main)
|
||||
|
||||
@@ -156,8 +163,9 @@ async def get_available_tests(
|
||||
dialog_manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
test_repo: FromDishka[TestRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
assert dialog_manager.event.from_user is not None
|
||||
user_id = dialog_manager.event.from_user.id
|
||||
user = await user_dao.get_by_id(user_id)
|
||||
|
||||
@@ -186,9 +194,9 @@ async def get_test_detail(
|
||||
dialog_manager: DialogManager,
|
||||
test_repo: FromDishka[TestRepository],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
user_dao: FromDishka[UserDAO],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
assert dialog_manager.event.from_user is not None
|
||||
test_id = dialog_manager.dialog_data.get("selected_test_id")
|
||||
user_id = dialog_manager.event.from_user.id
|
||||
|
||||
@@ -200,7 +208,6 @@ async def get_test_detail(
|
||||
if not test:
|
||||
return {"test_info": "❌ Тест не найден"}
|
||||
|
||||
user = await user_dao.get_by_id(user_id)
|
||||
attempts = await attempt_repo.get_user_test_attempts(user_id, test_id)
|
||||
finished_attempts = [a for a in attempts if a.finished_at]
|
||||
|
||||
@@ -226,8 +233,9 @@ async def get_test_detail(
|
||||
async def get_my_results(
|
||||
dialog_manager: DialogManager,
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
assert dialog_manager.event.from_user is not None
|
||||
user_id = dialog_manager.event.from_user.id
|
||||
attempts_with_tests = await attempt_repo.get_finished_attempts_with_tests(user_id)
|
||||
|
||||
|
||||
@@ -16,7 +16,13 @@ from trudex.infrastructure.database.dao.user import UserDAO
|
||||
|
||||
|
||||
@inject
|
||||
async def on_name_input(message: Message, _widget: MessageInput, manager: DialogManager, user_dao: FromDishka[UserDAO]):
|
||||
async def on_name_input(
|
||||
message: Message,
|
||||
_widget: MessageInput,
|
||||
manager: DialogManager,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
assert message.from_user is not None
|
||||
if not message.text:
|
||||
await message.answer("❌ Имя и фамилия не могут быть пустыми")
|
||||
return
|
||||
@@ -31,7 +37,9 @@ async def on_name_input(message: Message, _widget: MessageInput, manager: Dialog
|
||||
return
|
||||
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
user_id = start_data.get("user_id")
|
||||
if user_id:
|
||||
await user_dao.update(user_id=user_id, name=name)
|
||||
|
||||
manager.dialog_data["name"] = name
|
||||
@@ -48,11 +56,20 @@ async def get_groups_for_registration(group_dao: FromDishka[GroupDAO], **_kwargs
|
||||
|
||||
|
||||
@inject
|
||||
async def on_group_selected(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str, user_dao: FromDishka[UserDAO]):
|
||||
async def on_group_selected(
|
||||
_callback: CallbackQuery,
|
||||
_widget,
|
||||
manager: DialogManager,
|
||||
item_id: str,
|
||||
user_dao: FromDishka[UserDAO],
|
||||
):
|
||||
assert _callback.from_user is not None
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
user_id = start_data.get("user_id")
|
||||
pending_test_id = start_data.get("pending_test_id")
|
||||
|
||||
if user_id:
|
||||
await user_dao.update(user_id=user_id, group=int(item_id))
|
||||
|
||||
if pending_test_id:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
|
||||
@@ -34,6 +34,7 @@ async def on_start_test(
|
||||
test_repo: FromDishka[TestRepository],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
):
|
||||
assert _callback.from_user is not None
|
||||
test_id = manager.dialog_data.get("selected_test_id")
|
||||
user_id = _callback.from_user.id
|
||||
|
||||
@@ -50,7 +51,7 @@ async def on_start_test(
|
||||
await _callback.answer("❌ Тест деактивирован")
|
||||
return
|
||||
|
||||
if test.expires_at and test.expires_at < datetime.utcnow():
|
||||
if test.expires_at and test.expires_at < datetime.now(timezone.utc):
|
||||
await _callback.answer("❌ Срок действия теста истек")
|
||||
return
|
||||
|
||||
@@ -101,7 +102,9 @@ async def on_password_input(
|
||||
test_repo: FromDishka[TestRepository],
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
):
|
||||
assert message.from_user is not None
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
test_id = start_data.get("test_id")
|
||||
|
||||
if not test_id:
|
||||
@@ -146,6 +149,7 @@ async def on_cancel_test(
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
):
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
attempt_id = manager.dialog_data.get("attempt_id") or start_data.get("attempt_id")
|
||||
|
||||
if attempt_id:
|
||||
@@ -159,9 +163,10 @@ async def on_cancel_test(
|
||||
async def get_question_data(
|
||||
dialog_manager: DialogManager,
|
||||
test_repo: FromDishka[TestRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
start_data = dialog_manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
|
||||
current_index = dialog_manager.dialog_data.get("current_question_index")
|
||||
if current_index is None:
|
||||
@@ -188,6 +193,7 @@ async def get_question_data(
|
||||
|
||||
async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager: DialogManager, item_id: str):
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
current_index = manager.dialog_data.get("current_question_index")
|
||||
if current_index is None:
|
||||
current_index = start_data.get("current_question_index", 0)
|
||||
@@ -204,6 +210,7 @@ async def on_single_answer_selected(_callback: CallbackQuery, _widget, manager:
|
||||
|
||||
async def on_multiple_answer_changed(_event, widget, manager: DialogManager, _data: str):
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
current_index = manager.dialog_data.get("current_question_index")
|
||||
if current_index is None:
|
||||
current_index = start_data.get("current_question_index", 0)
|
||||
@@ -230,6 +237,7 @@ async def on_text_answer_input(
|
||||
answer_dao: FromDishka[UserAnswerDAO],
|
||||
):
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
current_index = manager.dialog_data.get("current_question_index")
|
||||
if current_index is None:
|
||||
current_index = start_data.get("current_question_index", 0)
|
||||
@@ -285,6 +293,7 @@ async def on_next_question(
|
||||
answer_dao: FromDishka[UserAnswerDAO],
|
||||
):
|
||||
start_data = manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
current_index = manager.dialog_data.get("current_question_index")
|
||||
if current_index is None:
|
||||
current_index = start_data.get("current_question_index", 0)
|
||||
@@ -409,9 +418,10 @@ async def get_detailed_results_data(
|
||||
dialog_manager: DialogManager,
|
||||
attempt_repo: FromDishka[TestAttemptRepository],
|
||||
test_repo: FromDishka[TestRepository],
|
||||
**_kwargs
|
||||
**_kwargs,
|
||||
):
|
||||
start_data = dialog_manager.start_data or {}
|
||||
assert isinstance(start_data, dict)
|
||||
attempt_id = dialog_manager.dialog_data.get("attempt_id") or start_data.get("attempt_id")
|
||||
|
||||
if not attempt_id:
|
||||
|
||||
Reference in New Issue
Block a user