mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 10:25:28 +03:00
update
This commit is contained in:
@@ -0,0 +1,204 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
from quizzi.domain.schemas import QuestionType
|
||||
from quizzi.infrastructure.database.dao.test import TestDAO
|
||||
from quizzi.infrastructure.database.dao.user_answer import UserAnswerDAO
|
||||
from quizzi.infrastructure.database.repo.test import TestRepository
|
||||
from quizzi.infrastructure.database.repo.test_attempt import TestAttemptRepository
|
||||
from quizzi.infrastructure.utils.timezone import now_msk_naive
|
||||
|
||||
|
||||
@dataclass
|
||||
class AttemptStartResult:
|
||||
success: bool
|
||||
attempt_id: int | None = None
|
||||
questions: list[int] | None = None
|
||||
started_at: datetime | None = None
|
||||
error: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnswerResult:
|
||||
success: bool
|
||||
is_correct: bool = False
|
||||
error: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestResult:
|
||||
score: int
|
||||
correct_count: int
|
||||
total_questions: int
|
||||
is_passed: bool
|
||||
|
||||
|
||||
class TestAttemptService:
|
||||
def __init__(
|
||||
self,
|
||||
test_dao: TestDAO,
|
||||
test_repo: TestRepository,
|
||||
attempt_repo: TestAttemptRepository,
|
||||
answer_dao: UserAnswerDAO,
|
||||
) -> None:
|
||||
self._test_dao = test_dao
|
||||
self._test_repo = test_repo
|
||||
self._attempt_repo = attempt_repo
|
||||
self._answer_dao = answer_dao
|
||||
|
||||
async def start_attempt(self, user_id: int, test_id: int) -> AttemptStartResult:
|
||||
active_attempt = await self._attempt_repo.get_active_attempt(user_id, test_id)
|
||||
if active_attempt:
|
||||
await self._attempt_repo.attempt_dao.delete(active_attempt.id)
|
||||
|
||||
_, questions = await self._test_repo.get_test_with_questions(test_id)
|
||||
if not questions:
|
||||
return AttemptStartResult(success=False, error="❌ В тесте нет вопросов")
|
||||
|
||||
attempt = await self._attempt_repo.attempt_dao.create(user_id=user_id, test_id=test_id)
|
||||
started_at = now_msk_naive()
|
||||
|
||||
return AttemptStartResult(
|
||||
success=True,
|
||||
attempt_id=attempt.id,
|
||||
questions=[q.id for q in questions],
|
||||
started_at=started_at,
|
||||
)
|
||||
|
||||
async def cancel_attempt(self, attempt_id: int) -> bool:
|
||||
return await self._attempt_repo.attempt_dao.delete(attempt_id)
|
||||
|
||||
async def save_single_answer(
|
||||
self,
|
||||
attempt_id: int,
|
||||
question_id: int,
|
||||
selected_option_id: int,
|
||||
) -> AnswerResult:
|
||||
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||
if not question:
|
||||
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||
|
||||
correct_options = [opt for opt in options if opt.is_correct]
|
||||
is_correct = any(opt.id == selected_option_id for opt in correct_options)
|
||||
selected_text = next((opt.text for opt in options if opt.id == selected_option_id), "")
|
||||
|
||||
await self._answer_dao.create(
|
||||
attempt_id=attempt_id,
|
||||
question_id=question_id,
|
||||
selected_option_id=selected_option_id,
|
||||
text_answer=selected_text,
|
||||
is_correct=is_correct,
|
||||
)
|
||||
|
||||
return AnswerResult(success=True, is_correct=is_correct)
|
||||
|
||||
async def save_multiple_answer(
|
||||
self,
|
||||
attempt_id: int,
|
||||
question_id: int,
|
||||
selected_option_ids: list[int],
|
||||
) -> AnswerResult:
|
||||
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||
if not question:
|
||||
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||
|
||||
selected_texts = sorted([opt.text for opt in options if opt.id in selected_option_ids])
|
||||
correct_texts = sorted([opt.text for opt in options if opt.is_correct])
|
||||
is_correct = selected_texts == correct_texts
|
||||
|
||||
await self._answer_dao.create(
|
||||
attempt_id=attempt_id,
|
||||
question_id=question_id,
|
||||
text_answer="|".join(selected_texts),
|
||||
is_correct=is_correct,
|
||||
)
|
||||
|
||||
return AnswerResult(success=True, is_correct=is_correct)
|
||||
|
||||
async def save_text_answer(
|
||||
self,
|
||||
attempt_id: int,
|
||||
question_id: int,
|
||||
text_answer: str,
|
||||
) -> AnswerResult:
|
||||
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||
if not question:
|
||||
return AnswerResult(success=False, error="❌ Вопрос не найден")
|
||||
|
||||
correct_options = [opt for opt in options if opt.is_correct]
|
||||
user_normalized = text_answer.lower().replace(" ", "")
|
||||
is_correct = any(opt.text.lower().replace(" ", "") == user_normalized for opt in correct_options)
|
||||
|
||||
await self._answer_dao.create(
|
||||
attempt_id=attempt_id,
|
||||
question_id=question_id,
|
||||
text_answer=text_answer,
|
||||
is_correct=is_correct,
|
||||
)
|
||||
|
||||
return AnswerResult(success=True, is_correct=is_correct)
|
||||
|
||||
async def finish_attempt(self, attempt_id: int, total_questions: int) -> TestResult:
|
||||
correct_count = await self._attempt_repo.calculate_attempt_score(attempt_id)
|
||||
score = round((correct_count / total_questions * 100)) if total_questions > 0 else 0
|
||||
is_passed = score >= 50
|
||||
|
||||
await self._attempt_repo.finish_attempt(attempt_id, score, is_passed)
|
||||
|
||||
return TestResult(
|
||||
score=score,
|
||||
correct_count=correct_count,
|
||||
total_questions=total_questions,
|
||||
is_passed=is_passed,
|
||||
)
|
||||
|
||||
async def finish_by_timeout(
|
||||
self,
|
||||
attempt_id: int,
|
||||
questions: list[int],
|
||||
user_answers: dict,
|
||||
) -> TestResult:
|
||||
answered_question_ids = set()
|
||||
answers = await self._attempt_repo.get_answers_for_attempt(attempt_id)
|
||||
for answer in answers:
|
||||
answered_question_ids.add(answer.question_id)
|
||||
|
||||
for question_id in questions:
|
||||
if question_id in answered_question_ids:
|
||||
continue
|
||||
|
||||
answer_data = user_answers.get(str(question_id))
|
||||
|
||||
if answer_data:
|
||||
question, options = await self._test_repo.get_question_with_options(question_id)
|
||||
if not question:
|
||||
continue
|
||||
|
||||
if answer_data["type"] == "single":
|
||||
await self.save_single_answer(attempt_id, question_id, answer_data["answer"])
|
||||
elif answer_data["type"] == "multiple":
|
||||
await self.save_multiple_answer(attempt_id, question_id, answer_data["answer"])
|
||||
else:
|
||||
await self._answer_dao.create(
|
||||
attempt_id=attempt_id,
|
||||
question_id=question_id,
|
||||
text_answer=None,
|
||||
is_correct=False,
|
||||
)
|
||||
|
||||
return await self.finish_attempt(attempt_id, len(questions))
|
||||
|
||||
async def get_question_state(self, question_type: str):
|
||||
from quizzi.application.bot.user_dialogs.states import UserTestSG
|
||||
|
||||
if question_type == QuestionType.SINGLE:
|
||||
return UserTestSG.question_single
|
||||
elif question_type == QuestionType.MULTIPLE:
|
||||
return UserTestSG.question_multiple
|
||||
else:
|
||||
return UserTestSG.question_input
|
||||
|
||||
async def get_next_question_state(self, question_id: int):
|
||||
question, _ = await self._test_repo.get_question_with_options(question_id)
|
||||
question_type = question.question_type if question else QuestionType.SINGLE
|
||||
return await self.get_question_state(question_type)
|
||||
Reference in New Issue
Block a user