mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 10:05:28 +03:00
working
This commit is contained in:
+7
-5
@@ -1,6 +1,7 @@
|
||||
from rich.console import Console
|
||||
|
||||
from argenta.app import App
|
||||
from argenta.app.autocompleter import AutoCompleter
|
||||
from argenta.router import Router
|
||||
from argenta.command import Command
|
||||
from argenta.response import Response
|
||||
@@ -54,7 +55,7 @@ network_router = Router("Сетевые операции", disable_redirect_stdo
|
||||
def ping_host(response: Response):
|
||||
host = input("Введите имя хоста: ")
|
||||
print(f"Пингую {host}...")
|
||||
subprocess.run(["ping", "-c", "4", host])
|
||||
subprocess.run(["ping", host])
|
||||
|
||||
@network_router.command(Command("ip", "Показать IP-адреса"))
|
||||
def show_ip(response: Response):
|
||||
@@ -78,12 +79,13 @@ def delete_user(response: Response):
|
||||
# Создаем приложение и регистрируем маршрутизаторы
|
||||
app = App(
|
||||
prompt="MyApp> ",
|
||||
initial_message="Сложное тестовое приложение Argenta!",
|
||||
farewell_message="До свидания!",
|
||||
dividing_line=StaticDividingLine("*"),
|
||||
initial_message="Caser",
|
||||
farewell_message="Pokeda",
|
||||
dividing_line=DynamicDividingLine("*"),
|
||||
repeat_command_groups=False,
|
||||
ignore_command_register=True,
|
||||
override_system_messages=True,
|
||||
override_system_messages=False,
|
||||
autocompleter=AutoCompleter('.history')
|
||||
)
|
||||
|
||||
app.include_routers(file_router, system_router, network_router, user_router)
|
||||
|
||||
+38
-95
@@ -21,10 +21,9 @@ from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
from argenta.response import Response
|
||||
|
||||
|
||||
|
||||
class BaseApp:
|
||||
def __init__(
|
||||
self,
|
||||
prompt: str,
|
||||
def __init__(self, prompt: str,
|
||||
initial_message: str,
|
||||
farewell_message: str,
|
||||
exit_command: Command,
|
||||
@@ -34,8 +33,7 @@ class BaseApp:
|
||||
repeat_command_groups: bool,
|
||||
override_system_messages: bool,
|
||||
autocompleter: AutoCompleter,
|
||||
print_func: Callable[[str], None],
|
||||
) -> None:
|
||||
print_func: Callable[[str], None]) -> None:
|
||||
self._prompt = prompt
|
||||
self._print_func = print_func
|
||||
self._exit_command = exit_command
|
||||
@@ -49,14 +47,17 @@ class BaseApp:
|
||||
self._farewell_message = farewell_message
|
||||
self._initial_message = initial_message
|
||||
|
||||
self._description_message_gen: Callable[[str, str], str] = (
|
||||
lambda command, description: f"{command} *=*=* {description}"
|
||||
)
|
||||
self._description_message_gen: Callable[[str, str], str] = lambda command, description: f"{command} *=*=* {description}"
|
||||
self._registered_routers: RegisteredRouters = RegisteredRouters()
|
||||
self._messages_on_startup: list[str] = []
|
||||
|
||||
self._all_registered_triggers_in_lower_case: list[str] = []
|
||||
self._all_registered_triggers_in_default_case: list[str] = []
|
||||
self._matching_lower_triggers_with_routers: dict[str, Router] = {}
|
||||
self._matching_default_triggers_with_routers: dict[str, Router] = {}
|
||||
|
||||
if self._ignore_command_register:
|
||||
self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_lower_triggers_with_routers
|
||||
else:
|
||||
self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_default_triggers_with_routers
|
||||
|
||||
self._incorrect_input_syntax_handler: Callable[[str], None] = (
|
||||
lambda raw_command: print_func(f"Incorrect flag syntax: {raw_command}")
|
||||
@@ -208,13 +209,10 @@ class BaseApp:
|
||||
"""
|
||||
input_command_trigger = command.get_trigger()
|
||||
if self._ignore_command_register:
|
||||
if (
|
||||
input_command_trigger.lower()
|
||||
in self._all_registered_triggers_in_lower_case
|
||||
):
|
||||
if input_command_trigger.lower() in list(self._current_matching_triggers_with_routers.keys()):
|
||||
return False
|
||||
else:
|
||||
if input_command_trigger in self._all_registered_triggers_in_default_case:
|
||||
if input_command_trigger in list(self._current_matching_triggers_with_routers.keys()):
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -250,11 +248,8 @@ class BaseApp:
|
||||
self._registered_routers.add_registered_router(system_router)
|
||||
|
||||
def _most_similar_command(self, unknown_command: str) -> str | None:
|
||||
all_commands = (
|
||||
self._all_registered_triggers_in_lower_case
|
||||
if self._ignore_command_register
|
||||
else self._all_registered_triggers_in_default_case
|
||||
)
|
||||
all_commands = list(self._current_matching_triggers_with_routers.keys())
|
||||
|
||||
matches: list[str] | list = sorted(
|
||||
cmd for cmd in all_commands if cmd.startswith(unknown_command)
|
||||
)
|
||||
@@ -275,9 +270,7 @@ class BaseApp:
|
||||
:return: None
|
||||
"""
|
||||
self._prompt = f"[italic dim bold]{self._prompt}"
|
||||
self._initial_message = (
|
||||
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n"
|
||||
)
|
||||
self._initial_message = ("\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n")
|
||||
self._farewell_message = (
|
||||
"[bold red]\n\n"
|
||||
+ text2art(self._farewell_message, font="chanky")
|
||||
@@ -289,22 +282,14 @@ class BaseApp:
|
||||
f"[blue dim]*=*=*[/blue dim] "
|
||||
f"[bold yellow italic]{escape(description)}"
|
||||
)
|
||||
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(
|
||||
f"[red bold]Incorrect flag syntax: {escape(raw_command)}"
|
||||
)
|
||||
self._repeated_input_flags_handler = lambda raw_command: self._print_func(
|
||||
f"[red bold]Repeated input flags: {escape(raw_command)}"
|
||||
)
|
||||
self._empty_input_command_handler = lambda: self._print_func(
|
||||
"[red bold]Empty input command"
|
||||
)
|
||||
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(f"[red bold]Incorrect flag syntax: {escape(raw_command)}")
|
||||
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f"[red bold]Repeated input flags: {escape(raw_command)}")
|
||||
self._empty_input_command_handler = lambda: self._print_func("[red bold]Empty input command")
|
||||
|
||||
def unknown_command_handler(command: InputCommand) -> None:
|
||||
cmd_trg: str = command.get_trigger()
|
||||
mst_sim_cmd: str | None = self._most_similar_command(cmd_trg)
|
||||
first_part_of_text = (
|
||||
f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
|
||||
)
|
||||
first_part_of_text = f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
|
||||
second_part_of_text = (
|
||||
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]"))
|
||||
if mst_sim_cmd
|
||||
@@ -326,30 +311,16 @@ class BaseApp:
|
||||
router_aliases = router_entity.get_aliases()
|
||||
combined = router_triggers + router_aliases
|
||||
|
||||
self._all_registered_triggers_in_default_case.extend(combined)
|
||||
for trigger in combined:
|
||||
self._matching_default_triggers_with_routers[trigger] = router_entity
|
||||
self._matching_lower_triggers_with_routers[trigger.lower()] = router_entity
|
||||
|
||||
self._all_registered_triggers_in_lower_case.extend(
|
||||
x.lower() for x in combined
|
||||
)
|
||||
self._autocompleter.initial_setup(list(self._current_matching_triggers_with_routers.keys()))
|
||||
|
||||
self._autocompleter.initial_setup(self._all_registered_triggers_in_lower_case)
|
||||
|
||||
if self._ignore_command_register:
|
||||
seen = {}
|
||||
for item in self._all_registered_triggers_in_lower_case:
|
||||
for item in list(self._current_matching_triggers_with_routers.keys()):
|
||||
if item in seen:
|
||||
Console().print(
|
||||
f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]"
|
||||
)
|
||||
else:
|
||||
seen[item] = True
|
||||
else:
|
||||
seen = {}
|
||||
for item in self._all_registered_triggers_in_default_case:
|
||||
if item in seen:
|
||||
Console().print(
|
||||
f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]"
|
||||
)
|
||||
Console().print(f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]")
|
||||
else:
|
||||
seen[item] = True
|
||||
|
||||
@@ -424,9 +395,7 @@ class App(BaseApp):
|
||||
raw_command: str = Console().input(self._prompt)
|
||||
|
||||
try:
|
||||
input_command: InputCommand = InputCommand.parse(
|
||||
raw_command=raw_command
|
||||
)
|
||||
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
|
||||
except BaseInputCommandException as error:
|
||||
with redirect_stdout(io.StringIO()) as f:
|
||||
self._error_handler(error, raw_command)
|
||||
@@ -436,14 +405,7 @@ class App(BaseApp):
|
||||
|
||||
if self._is_exit_command(input_command):
|
||||
system_router.finds_appropriate_handler(input_command)
|
||||
if self._ignore_command_register:
|
||||
self._autocompleter.exit_setup(
|
||||
self._all_registered_triggers_in_lower_case
|
||||
)
|
||||
else:
|
||||
self._autocompleter.exit_setup(
|
||||
self._all_registered_triggers_in_default_case
|
||||
)
|
||||
self._autocompleter.exit_setup(list(self._current_matching_triggers_with_routers.keys()))
|
||||
return
|
||||
|
||||
if self._is_unknown_command(input_command):
|
||||
@@ -453,41 +415,22 @@ class App(BaseApp):
|
||||
self._print_framed_text(res)
|
||||
continue
|
||||
|
||||
for registered_router in self._registered_routers:
|
||||
if registered_router.disable_redirect_stdout:
|
||||
processing_router = self._current_matching_triggers_with_routers[input_command.get_trigger()]
|
||||
|
||||
if processing_router.disable_redirect_stdout:
|
||||
if isinstance(self._dividing_line, StaticDividingLine):
|
||||
print('k')
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(
|
||||
self._override_system_messages
|
||||
)
|
||||
)
|
||||
registered_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(
|
||||
self._override_system_messages
|
||||
)
|
||||
)
|
||||
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
|
||||
else:
|
||||
print('j')
|
||||
self._print_func(
|
||||
StaticDividingLine(
|
||||
self._dividing_line.get_unit_part()
|
||||
).get_full_static_line(self._override_system_messages)
|
||||
)
|
||||
registered_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(
|
||||
StaticDividingLine(
|
||||
self._dividing_line.get_unit_part()
|
||||
).get_full_static_line(self._override_system_messages)
|
||||
)
|
||||
self._print_func(StaticDividingLine(self._dividing_line.get_unit_part()).get_full_static_line(self._override_system_messages))
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(StaticDividingLine(self._dividing_line.get_unit_part()).get_full_static_line(self._override_system_messages))
|
||||
else:
|
||||
print('m')
|
||||
with redirect_stdout(io.StringIO()) as f:
|
||||
registered_router.finds_appropriate_handler(input_command)
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
res: str = f.getvalue()
|
||||
if res:
|
||||
print('w')
|
||||
self._print_framed_text(res)
|
||||
|
||||
def include_router(self, router: Router) -> None:
|
||||
|
||||
@@ -93,9 +93,7 @@ class Router:
|
||||
response: Response = Response()
|
||||
if handle_command.get_registered_flags().get_flags():
|
||||
if input_command_flags.get_flags():
|
||||
response: Response = self._structuring_input_flags(
|
||||
handle_command, input_command_flags
|
||||
)
|
||||
response: Response = self._structuring_input_flags( handle_command, input_command_flags )
|
||||
command_handler.handling(response)
|
||||
else:
|
||||
response.status = Status.ALL_FLAGS_VALID
|
||||
|
||||
Reference in New Issue
Block a user