mirror of
https://github.com/koloideal/Quizzi.git
synced 2026-06-10 18:35:28 +03:00
commit
This commit is contained in:
@@ -49,7 +49,7 @@ async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manage
|
||||
|
||||
async def on_broadcast_cancel(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
await _callback.answer("Рассылка отменена")
|
||||
await manager.done()
|
||||
await manager.start(AdminMenuSG.main, mode=StartMode.RESET_STACK)
|
||||
|
||||
|
||||
async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
|
||||
@@ -3,13 +3,11 @@ import functools
|
||||
from datetime import date, datetime
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.enums import ContentType
|
||||
from aiogram.types import BufferedInputFile, CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
|
||||
from aiogram_dialog.widgets.input import MessageInput
|
||||
from aiogram_dialog.widgets.kbd import (Button, Calendar, Column, Row,
|
||||
from aiogram_dialog.widgets.kbd import (Button, Calendar, Column,
|
||||
ScrollingGroup, Select)
|
||||
from aiogram_dialog.widgets.media import DynamicMedia
|
||||
from aiogram_dialog.widgets.text import Const, Format
|
||||
from dishka import FromDishka
|
||||
from dishka.integrations.aiogram_dialog import inject
|
||||
@@ -21,7 +19,7 @@ from trudex.infrastructure.database.dao.test import TestDAO
|
||||
from trudex.infrastructure.database.repo.test import TestRepository
|
||||
from trudex.infrastructure.utils.config import Config
|
||||
from trudex.infrastructure.utils.qr_generator import generate_qr_bytes
|
||||
from trudex.infrastructure.utils.test_id_to_hash import generate_alpha_id
|
||||
from trudex.infrastructure.utils.test_id_to_hash import encode_id
|
||||
|
||||
|
||||
@inject
|
||||
@@ -111,54 +109,40 @@ async def on_back_to_list(_callback: CallbackQuery, _button: Button, manager: Di
|
||||
await manager.switch_to(AdminTestsSG.tests_list)
|
||||
|
||||
|
||||
async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
await manager.switch_to(AdminTestsSG.share_test)
|
||||
async def on_statistics(_callback: CallbackQuery, _button: Button, _manager: DialogManager):
|
||||
await _callback.answer("🚧 В разработке")
|
||||
|
||||
|
||||
@inject
|
||||
async def get_share_data(dialog_manager: DialogManager, config: FromDishka[Config], bot: FromDishka[Bot], **_kwargs):
|
||||
test_id = dialog_manager.dialog_data.get("selected_test_id")
|
||||
async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager, config: FromDishka[Config], bot_inst: FromDishka[Bot]):
|
||||
test_id = manager.dialog_data.get("selected_test_id")
|
||||
|
||||
if not test_id:
|
||||
return {
|
||||
"share_link": "Ошибка: тест не найден"
|
||||
}
|
||||
await _callback.answer("Ошибка: тест не найден")
|
||||
return
|
||||
|
||||
# Генерируем хэш и ссылку
|
||||
test_hash = generate_alpha_id(
|
||||
test_hash = encode_id(
|
||||
test_id,
|
||||
config.security.test_hash_salt,
|
||||
config.security.test_hash_length
|
||||
config.security.encode_key,
|
||||
config.security.encoded_string_length
|
||||
)
|
||||
|
||||
bot_info = await bot.get_me()
|
||||
bot_info = await bot_inst.get_me()
|
||||
bot_username = bot_info.username or "your_bot"
|
||||
share_link = f"https://t.me/{bot_username}?start={test_hash}"
|
||||
|
||||
# Генерируем QR-код в отдельном потоке
|
||||
loop = asyncio.get_running_loop()
|
||||
qr_bytes = await loop.run_in_executor(
|
||||
None,
|
||||
functools.partial(generate_qr_bytes, share_link)
|
||||
)
|
||||
|
||||
# Сохраняем в dialog_data для использования в media selector
|
||||
dialog_manager.dialog_data["qr_bytes"] = qr_bytes
|
||||
|
||||
return {
|
||||
"share_link": share_link,
|
||||
}
|
||||
|
||||
assert _callback.message is not None
|
||||
|
||||
async def qr_media_selector(data: dict, widget, manager: DialogManager):
|
||||
"""Селектор для получения QR-кода из dialog_data"""
|
||||
qr_bytes = manager.dialog_data.get("qr_bytes")
|
||||
if not qr_bytes:
|
||||
return None
|
||||
return {
|
||||
"type": ContentType.PHOTO,
|
||||
"media": BufferedInputFile(qr_bytes, filename="qr.png")
|
||||
}
|
||||
await _callback.message.answer_photo(
|
||||
photo=BufferedInputFile(qr_bytes, filename="qr.png"),
|
||||
caption=f"<b>🔗 Поделиться тестом</b>\n\n📎 <b>Ссылка на тест:</b>\n<code>{share_link}</code>\n\n💡 Отправьте эту ссылку или QR-код пользователям для прохождения теста"
|
||||
)
|
||||
|
||||
|
||||
async def on_edit_menu(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
@@ -362,6 +346,7 @@ tests_dialog = Dialog(
|
||||
id="toggle_active",
|
||||
on_click=on_toggle_active
|
||||
),
|
||||
Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics),
|
||||
Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test),
|
||||
Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu),
|
||||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list),
|
||||
@@ -428,11 +413,4 @@ tests_dialog = Dialog(
|
||||
),
|
||||
state=AdminTestsSG.edit_expires,
|
||||
),
|
||||
Window(
|
||||
Format("<b>🔗 Поделиться тестом</b>\n\n📎 <b>Ссылка на тест:</b>\n<code>{share_link}</code>\n\n💡 Отправьте эту ссылку или QR-код пользователям для прохождения теста"),
|
||||
DynamicMedia(selector=qr_media_selector),
|
||||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list),
|
||||
state=AdminTestsSG.share_test,
|
||||
getter=get_share_data,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram_dialog import Dialog, DialogManager, Window
|
||||
from aiogram_dialog import Dialog, DialogManager, StartMode, Window
|
||||
from aiogram_dialog.widgets.input import MessageInput
|
||||
from aiogram_dialog.widgets.kbd import Button, Cancel, Row
|
||||
from aiogram_dialog.widgets.kbd import Button, Row
|
||||
from aiogram_dialog.widgets.text import Const
|
||||
from dishka import FromDishka
|
||||
from dishka.integrations.aiogram_dialog import inject
|
||||
|
||||
from trudex.application.bot.creator_dialogs.states import CreatorBroadcastSG
|
||||
from trudex.application.bot.creator_dialogs.states import (CreatorBroadcastSG,
|
||||
CreatorMenuSG)
|
||||
from trudex.infrastructure.database.dao.user import UserDAO
|
||||
from trudex.infrastructure.utils.broadcast import broadcast_message
|
||||
|
||||
@@ -48,7 +49,7 @@ async def on_broadcast_confirm(_callback: CallbackQuery, _button: Button, manage
|
||||
|
||||
async def on_broadcast_cancel(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
await _callback.answer("Рассылка отменена")
|
||||
await manager.done()
|
||||
await manager.start(CreatorMenuSG.main, mode=StartMode.RESET_STACK)
|
||||
|
||||
|
||||
async def on_back_to_main(_callback: CallbackQuery, _button: Button, manager: DialogManager):
|
||||
|
||||
@@ -24,7 +24,7 @@ from trudex.infrastructure.database.dao.test import TestDAO
|
||||
from trudex.infrastructure.database.repo.test import TestRepository
|
||||
from trudex.infrastructure.utils.config import Config
|
||||
from trudex.infrastructure.utils.qr_generator import generate_qr_bytes
|
||||
from trudex.infrastructure.utils.test_id_to_hash import generate_alpha_id
|
||||
from trudex.infrastructure.utils.test_id_to_hash import encode_id
|
||||
|
||||
|
||||
@inject
|
||||
@@ -114,6 +114,10 @@ async def on_back_to_list(_callback: CallbackQuery, _button: Button, manager: Di
|
||||
await manager.switch_to(CreatorTestsSG.tests_list)
|
||||
|
||||
|
||||
async def on_statistics(_callback: CallbackQuery, _button: Button, _manager: DialogManager):
|
||||
await _callback.answer("🚧 В разработке")
|
||||
|
||||
|
||||
@inject
|
||||
async def on_share_test(_callback: CallbackQuery, _button: Button, manager: DialogManager, config: FromDishka[Config], bot_inst: FromDishka[Bot]):
|
||||
test_id = manager.dialog_data.get("selected_test_id")
|
||||
@@ -123,10 +127,10 @@ async def on_share_test(_callback: CallbackQuery, _button: Button, manager: Dial
|
||||
"share_link": "Ошибка: тест не найден"
|
||||
}
|
||||
|
||||
test_hash = generate_alpha_id(
|
||||
test_hash = encode_id(
|
||||
test_id,
|
||||
config.security.test_hash_salt,
|
||||
config.security.test_hash_length
|
||||
config.security.encode_key,
|
||||
config.security.encoded_string_length
|
||||
)
|
||||
|
||||
bot_info = await bot_inst.get_me()
|
||||
@@ -348,6 +352,7 @@ tests_dialog = Dialog(
|
||||
id="toggle_active",
|
||||
on_click=on_toggle_active
|
||||
),
|
||||
Button(Const("📊 Статистика"), id="statistics", on_click=on_statistics),
|
||||
Button(Const("🔗 Поделиться"), id="share", on_click=on_share_test),
|
||||
Button(Const("✏️ Изменить"), id="edit_menu", on_click=on_edit_menu),
|
||||
Button(Const("◀️ Назад"), id="back", on_click=on_back_to_list),
|
||||
|
||||
@@ -12,8 +12,8 @@ class BotConfig:
|
||||
|
||||
@dataclass
|
||||
class SecurityConfig:
|
||||
test_hash_salt: str
|
||||
test_hash_length: int = 16
|
||||
encode_key: str
|
||||
encoded_string_length: int = 8
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -57,7 +57,7 @@ class Config:
|
||||
database=str(db_data["database"])
|
||||
),
|
||||
security=SecurityConfig(
|
||||
test_hash_salt=str(security_data["test_hash_salt"]),
|
||||
test_hash_length=int(security_data.get("test_hash_length", 16))
|
||||
encode_key=str(security_data["encode_key"]),
|
||||
encoded_string_length=int(security_data.get("encoded_string_length", 8))
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1,25 +1,70 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import hashlib
|
||||
import string
|
||||
|
||||
|
||||
def generate_alpha_id(n: int, secret_key: str, length: int = 16) -> str:
|
||||
data = str(n).encode('utf-8')
|
||||
key = secret_key.encode('utf-8')
|
||||
ALPHABET = string.ascii_letters
|
||||
|
||||
def _feistel_round(val: int, key: bytes, rounds: int) -> int:
|
||||
msg = f"{val}:{rounds}".encode()
|
||||
h = hmac.new(key, msg, hashlib.sha256).digest()
|
||||
return int.from_bytes(h[:4], 'big')
|
||||
|
||||
def permute_id(n: int, key_str: str, bits: int) -> int:
|
||||
key = key_str.encode()
|
||||
split = bits // 2
|
||||
mask = (1 << split) - 1
|
||||
|
||||
digest = hmac.new(key, data, hashlib.sha256).digest()
|
||||
num = int.from_bytes(digest, byteorder='big')
|
||||
left = (n >> split) & mask
|
||||
right = n & mask
|
||||
|
||||
alphabet = string.ascii_letters
|
||||
result = []
|
||||
|
||||
while num > 0:
|
||||
num, rem = divmod(num, 52)
|
||||
result.append(alphabet[rem])
|
||||
for i in range(6):
|
||||
new_left = right
|
||||
f_val = _feistel_round(right, key, i)
|
||||
new_right = left ^ (f_val & mask)
|
||||
left, right = new_left, new_right
|
||||
|
||||
encoded = "".join(result)
|
||||
return (left << split) | right
|
||||
|
||||
def unpermute_id(n: int, key_str: str, bits: int) -> int:
|
||||
key = key_str.encode()
|
||||
split = bits // 2
|
||||
mask = (1 << split) - 1
|
||||
|
||||
if len(encoded) < length:
|
||||
encoded = encoded.ljust(length, alphabet[0])
|
||||
left = (n >> split) & mask
|
||||
right = n & mask
|
||||
|
||||
for i in reversed(range(6)):
|
||||
new_right = left
|
||||
f_val = _feistel_round(left, key, i)
|
||||
new_left = right ^ (f_val & mask)
|
||||
left, right = new_left, new_right
|
||||
|
||||
return encoded[:length]
|
||||
return (left << split) | right
|
||||
|
||||
|
||||
def encode_id(n: int, key: str, length: int = 8) -> str:
|
||||
bits = length * 5
|
||||
if length >= 8: bits = 44
|
||||
elif length == 7: bits = 38
|
||||
|
||||
permuted = permute_id(n, key, bits=bits)
|
||||
|
||||
chars = []
|
||||
for _ in range(length):
|
||||
permuted, rem = divmod(permuted, 52)
|
||||
chars.append(ALPHABET[rem])
|
||||
|
||||
return "".join(chars)
|
||||
|
||||
def decode_id(s: str, key: str) -> int:
|
||||
num = 0
|
||||
for char in reversed(s):
|
||||
num = num * 52 + ALPHABET.index(char)
|
||||
|
||||
length = len(s)
|
||||
bits = length * 5
|
||||
if length >= 8: bits = 44
|
||||
elif length == 7: bits = 38
|
||||
|
||||
return unpermute_id(num, key, bits=bits)
|
||||
|
||||
Reference in New Issue
Block a user