mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 18:15:28 +03:00
impl di in handlers with support custom provider
This commit is contained in:
+141
-70
@@ -3,7 +3,6 @@ import re
|
||||
from contextlib import redirect_stdout
|
||||
from typing import Never, TypeAlias
|
||||
|
||||
from argenta.orchestrator.argparser.entity import ArgSpace
|
||||
from art import text2art # pyright: ignore[reportMissingTypeStubs, reportUnknownVariableType]
|
||||
from rich.console import Console
|
||||
from rich.markup import escape
|
||||
@@ -32,18 +31,21 @@ Matches: TypeAlias = list[str] | list[Never]
|
||||
|
||||
|
||||
class BaseApp:
|
||||
def __init__(self, *, prompt: str,
|
||||
initial_message: str,
|
||||
farewell_message: str,
|
||||
exit_command: Command,
|
||||
system_router_title: str | None,
|
||||
ignore_command_register: bool,
|
||||
dividing_line: StaticDividingLine | DynamicDividingLine,
|
||||
repeat_command_groups: bool,
|
||||
override_system_messages: bool,
|
||||
autocompleter: AutoCompleter,
|
||||
print_func: Printer,
|
||||
argspace: ArgSpace | None = None) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
prompt: str,
|
||||
initial_message: str,
|
||||
farewell_message: str,
|
||||
exit_command: Command,
|
||||
system_router_title: str | None,
|
||||
ignore_command_register: bool,
|
||||
dividing_line: StaticDividingLine | DynamicDividingLine,
|
||||
repeat_command_groups: bool,
|
||||
override_system_messages: bool,
|
||||
autocompleter: AutoCompleter,
|
||||
print_func: Printer,
|
||||
) -> None:
|
||||
self._prompt: str = prompt
|
||||
self._print_func: Printer = print_func
|
||||
self._exit_command: Command = exit_command
|
||||
@@ -53,27 +55,44 @@ class BaseApp:
|
||||
self._repeat_command_groups_description: bool = repeat_command_groups
|
||||
self._override_system_messages: bool = override_system_messages
|
||||
self._autocompleter: AutoCompleter = autocompleter
|
||||
self._argspace: ArgSpace | None = argspace
|
||||
|
||||
self._farewell_message: str = farewell_message
|
||||
self._initial_message: str = initial_message
|
||||
|
||||
self._description_message_gen: DescriptionMessageGenerator = lambda command, description: f"{command} *=*=* {description}"
|
||||
self._registered_routers: RegisteredRouters = RegisteredRouters()
|
||||
self._description_message_gen: DescriptionMessageGenerator = (
|
||||
lambda command, description: f"{command} *=*=* {description}"
|
||||
)
|
||||
self.registered_routers: RegisteredRouters = RegisteredRouters()
|
||||
self._messages_on_startup: list[str] = []
|
||||
|
||||
self._matching_lower_triggers_with_routers: dict[str, Router] = {}
|
||||
self._matching_default_triggers_with_routers: dict[str, Router] = {}
|
||||
|
||||
self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_lower_triggers_with_routers if self._ignore_command_register else self._matching_default_triggers_with_routers
|
||||
self._current_matching_triggers_with_routers: dict[str, Router] = (
|
||||
self._matching_lower_triggers_with_routers
|
||||
if self._ignore_command_register
|
||||
else self._matching_default_triggers_with_routers
|
||||
)
|
||||
|
||||
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Incorrect flag syntax: {_}")
|
||||
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Repeated input flags: {_}")
|
||||
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func("Empty input command")
|
||||
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = lambda _: print_func(f"Unknown command: {_.trigger}")
|
||||
self._exit_command_handler: NonStandardBehaviorHandler[Response] = lambda _: print_func(self._farewell_message)
|
||||
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = (
|
||||
lambda _: print_func(f"Incorrect flag syntax: {_}")
|
||||
)
|
||||
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = (
|
||||
lambda _: print_func(f"Repeated input flags: {_}")
|
||||
)
|
||||
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func(
|
||||
"Empty input command"
|
||||
)
|
||||
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = (
|
||||
lambda _: print_func(f"Unknown command: {_.trigger}")
|
||||
)
|
||||
self._exit_command_handler: NonStandardBehaviorHandler[Response] = (
|
||||
lambda _: print_func(self._farewell_message)
|
||||
)
|
||||
|
||||
def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None:
|
||||
def set_description_message_pattern(
|
||||
self, _: DescriptionMessageGenerator, /
|
||||
) -> None:
|
||||
"""
|
||||
Public. Sets the output pattern of the available commands
|
||||
:param _: output pattern of the available commands
|
||||
@@ -81,7 +100,9 @@ class BaseApp:
|
||||
"""
|
||||
self._description_message_gen = _
|
||||
|
||||
def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
|
||||
def set_incorrect_input_syntax_handler(
|
||||
self, _: NonStandardBehaviorHandler[str], /
|
||||
) -> None:
|
||||
"""
|
||||
Public. Sets the handler for incorrect flags when entering a command
|
||||
:param _: handler for incorrect flags when entering a command
|
||||
@@ -89,7 +110,9 @@ class BaseApp:
|
||||
"""
|
||||
self._incorrect_input_syntax_handler = _
|
||||
|
||||
def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
|
||||
def set_repeated_input_flags_handler(
|
||||
self, _: NonStandardBehaviorHandler[str], /
|
||||
) -> None:
|
||||
"""
|
||||
Public. Sets the handler for repeated flags when entering a command
|
||||
:param _: handler for repeated flags when entering a command
|
||||
@@ -97,7 +120,9 @@ class BaseApp:
|
||||
"""
|
||||
self._repeated_input_flags_handler = _
|
||||
|
||||
def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None:
|
||||
def set_unknown_command_handler(
|
||||
self, _: NonStandardBehaviorHandler[InputCommand], /
|
||||
) -> None:
|
||||
"""
|
||||
Public. Sets the handler for unknown commands when entering a command
|
||||
:param _: handler for unknown commands when entering a command
|
||||
@@ -113,7 +138,9 @@ class BaseApp:
|
||||
"""
|
||||
self._empty_input_command_handler = _
|
||||
|
||||
def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None:
|
||||
def set_exit_command_handler(
|
||||
self, _: NonStandardBehaviorHandler[Response], /
|
||||
) -> None:
|
||||
"""
|
||||
Public. Sets the handler for exit command when entering a command
|
||||
:param _: handler for exit command when entering a command
|
||||
@@ -126,7 +153,7 @@ class BaseApp:
|
||||
Private. Prints the description of the available commands
|
||||
:return: None
|
||||
"""
|
||||
for registered_router in self._registered_routers:
|
||||
for registered_router in self.registered_routers:
|
||||
if registered_router.title:
|
||||
self._print_func(registered_router.title)
|
||||
for command_handler in registered_router.command_handlers:
|
||||
@@ -167,14 +194,18 @@ class BaseApp:
|
||||
length=max_length_line, is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
|
||||
elif isinstance(self._dividing_line, StaticDividingLine): # pyright: ignore[reportUnnecessaryIsInstance]
|
||||
|
||||
elif isinstance(self._dividing_line, StaticDividingLine): # pyright: ignore[reportUnnecessaryIsInstance]
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
|
||||
self._dividing_line.get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
print(text.strip("\n"))
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
|
||||
self._dividing_line.get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
@@ -189,13 +220,9 @@ class BaseApp:
|
||||
trigger = command.trigger
|
||||
exit_trigger = self._exit_command.trigger
|
||||
if self._ignore_command_register:
|
||||
if (
|
||||
trigger.lower() == exit_trigger.lower()
|
||||
):
|
||||
if trigger.lower() == exit_trigger.lower():
|
||||
return True
|
||||
elif trigger.lower() in [
|
||||
x.lower() for x in self._exit_command.aliases
|
||||
]:
|
||||
elif trigger.lower() in [x.lower() for x in self._exit_command.aliases]:
|
||||
return True
|
||||
else:
|
||||
if trigger == exit_trigger:
|
||||
@@ -212,16 +239,18 @@ class BaseApp:
|
||||
"""
|
||||
input_command_trigger = command.trigger
|
||||
if self._ignore_command_register:
|
||||
if input_command_trigger.lower() in list(self._current_matching_triggers_with_routers.keys()):
|
||||
if input_command_trigger.lower() in list(
|
||||
self._current_matching_triggers_with_routers.keys()
|
||||
):
|
||||
return False
|
||||
else:
|
||||
if input_command_trigger in list(self._current_matching_triggers_with_routers.keys()):
|
||||
if input_command_trigger in list(
|
||||
self._current_matching_triggers_with_routers.keys()
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _error_handler(
|
||||
self, error: InputCommandException, raw_command: str
|
||||
) -> None:
|
||||
def _error_handler(self, error: InputCommandException, raw_command: str) -> None:
|
||||
"""
|
||||
Private. Handles parsing errors of the entered command
|
||||
:param error: error being handled
|
||||
@@ -246,13 +275,13 @@ class BaseApp:
|
||||
def _(response: Response) -> None:
|
||||
self._exit_command_handler(response)
|
||||
|
||||
if system_router not in self._registered_routers.registered_routers:
|
||||
if system_router not in self.registered_routers.registered_routers:
|
||||
system_router.command_register_ignore = self._ignore_command_register
|
||||
self._registered_routers.add_registered_router(system_router)
|
||||
self.registered_routers.add_registered_router(system_router)
|
||||
|
||||
def _most_similar_command(self, unknown_command: str) -> str | None:
|
||||
all_commands = list(self._current_matching_triggers_with_routers.keys())
|
||||
|
||||
|
||||
matches_startswith_unknown_command: Matches = sorted(
|
||||
cmd for cmd in all_commands if cmd.startswith(unknown_command)
|
||||
)
|
||||
@@ -275,26 +304,36 @@ class BaseApp:
|
||||
:return: None
|
||||
"""
|
||||
self._prompt = f"[italic dim bold]{self._prompt}"
|
||||
self._initial_message = ("\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n")
|
||||
self._initial_message = (
|
||||
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n"
|
||||
)
|
||||
self._farewell_message = (
|
||||
"[bold red]\n\n" +
|
||||
str(text2art(self._farewell_message, font="chanky")) + # pyright: ignore[reportUnknownArgumentType]
|
||||
"\n[/bold red]\n" +
|
||||
"[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n"
|
||||
"[bold red]\n\n"
|
||||
+ str(text2art(self._farewell_message, font="chanky")) # pyright: ignore[reportUnknownArgumentType]
|
||||
+ "\n[/bold red]\n"
|
||||
+ "[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n"
|
||||
)
|
||||
self._description_message_gen = lambda command, description: (
|
||||
f"[bold red]{escape('[' + command + ']')}[/bold red] "
|
||||
f"[blue dim]*=*=*[/blue dim] "
|
||||
f"[bold yellow italic]{escape(description)}"
|
||||
)
|
||||
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(f"[red bold]Incorrect flag syntax: {escape(raw_command)}")
|
||||
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f"[red bold]Repeated input flags: {escape(raw_command)}")
|
||||
self._empty_input_command_handler = lambda: self._print_func("[red bold]Empty input command")
|
||||
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(
|
||||
f"[red bold]Incorrect flag syntax: {escape(raw_command)}"
|
||||
)
|
||||
self._repeated_input_flags_handler = lambda raw_command: self._print_func(
|
||||
f"[red bold]Repeated input flags: {escape(raw_command)}"
|
||||
)
|
||||
self._empty_input_command_handler = lambda: self._print_func(
|
||||
"[red bold]Empty input command"
|
||||
)
|
||||
|
||||
def unknown_command_handler(command: InputCommand) -> None:
|
||||
cmd_trg: str = command.trigger
|
||||
mst_sim_cmd: str | None = self._most_similar_command(cmd_trg)
|
||||
first_part_of_text = f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
|
||||
first_part_of_text = (
|
||||
f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
|
||||
)
|
||||
second_part_of_text = (
|
||||
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]"))
|
||||
if mst_sim_cmd
|
||||
@@ -311,21 +350,27 @@ class BaseApp:
|
||||
"""
|
||||
self._setup_system_router()
|
||||
|
||||
for router_entity in self._registered_routers:
|
||||
for router_entity in self.registered_routers:
|
||||
router_triggers = router_entity.triggers
|
||||
router_aliases = router_entity.aliases
|
||||
combined = router_triggers + router_aliases
|
||||
|
||||
for trigger in combined:
|
||||
self._matching_default_triggers_with_routers[trigger] = router_entity
|
||||
self._matching_lower_triggers_with_routers[trigger.lower()] = router_entity
|
||||
self._matching_lower_triggers_with_routers[trigger.lower()] = (
|
||||
router_entity
|
||||
)
|
||||
|
||||
self._autocompleter.initial_setup(list(self._current_matching_triggers_with_routers.keys()))
|
||||
self._autocompleter.initial_setup(
|
||||
list(self._current_matching_triggers_with_routers.keys())
|
||||
)
|
||||
|
||||
seen = {}
|
||||
for item in list(self._current_matching_triggers_with_routers.keys()):
|
||||
if item in seen:
|
||||
Console().print(f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]")
|
||||
Console().print(
|
||||
f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]"
|
||||
)
|
||||
else:
|
||||
seen[item] = True
|
||||
|
||||
@@ -352,7 +397,8 @@ DEFAULT_EXIT_COMMAND: Command = Command("Q", description="Exit command")
|
||||
|
||||
class App(BaseApp):
|
||||
def __init__(
|
||||
self, *,
|
||||
self,
|
||||
*,
|
||||
prompt: str = "What do you want to do?\n\n",
|
||||
initial_message: str = "Argenta\n",
|
||||
farewell_message: str = "\nSee you\n",
|
||||
@@ -395,12 +441,11 @@ class App(BaseApp):
|
||||
print_func=print_func,
|
||||
)
|
||||
|
||||
def run_polling(self, argspace: ArgSpace | None) -> None:
|
||||
def run_polling(self) -> None:
|
||||
"""
|
||||
Private. Starts the user input processing cycle
|
||||
:return: None
|
||||
"""
|
||||
self._argspace = argspace
|
||||
self._pre_cycle_setup()
|
||||
while True:
|
||||
if self._repeat_command_groups_description:
|
||||
@@ -409,7 +454,9 @@ class App(BaseApp):
|
||||
raw_command: str = Console().input(self._prompt)
|
||||
|
||||
try:
|
||||
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
|
||||
input_command: InputCommand = InputCommand.parse(
|
||||
raw_command=raw_command
|
||||
)
|
||||
except InputCommandException as error:
|
||||
with redirect_stdout(io.StringIO()) as stderr:
|
||||
self._error_handler(error, raw_command)
|
||||
@@ -419,7 +466,9 @@ class App(BaseApp):
|
||||
|
||||
if self._is_exit_command(input_command):
|
||||
system_router.finds_appropriate_handler(input_command)
|
||||
self._autocompleter.exit_setup(list(self._current_matching_triggers_with_routers.keys()))
|
||||
self._autocompleter.exit_setup(
|
||||
list(self._current_matching_triggers_with_routers.keys())
|
||||
)
|
||||
return
|
||||
|
||||
if self._is_unknown_command(input_command):
|
||||
@@ -429,18 +478,40 @@ class App(BaseApp):
|
||||
self._print_framed_text(stdout_res)
|
||||
continue
|
||||
|
||||
processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()]
|
||||
processing_router = self._current_matching_triggers_with_routers[
|
||||
input_command.trigger.lower()
|
||||
]
|
||||
|
||||
if processing_router.disable_redirect_stdout:
|
||||
if isinstance(self._dividing_line, StaticDividingLine):
|
||||
self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages))
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages))
|
||||
self._print_func(
|
||||
self._dividing_line.get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
else:
|
||||
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
|
||||
self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages))
|
||||
self._print_func(
|
||||
StaticDividingLine(
|
||||
dividing_line_unit_part
|
||||
).get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages))
|
||||
self._print_func(
|
||||
StaticDividingLine(
|
||||
dividing_line_unit_part
|
||||
).get_full_static_line(
|
||||
is_override=self._override_system_messages
|
||||
)
|
||||
)
|
||||
else:
|
||||
with redirect_stdout(io.StringIO()) as stdout:
|
||||
processing_router.finds_appropriate_handler(input_command)
|
||||
@@ -455,7 +526,7 @@ class App(BaseApp):
|
||||
:return: None
|
||||
"""
|
||||
router.command_register_ignore = self._ignore_command_register
|
||||
self._registered_routers.add_registered_router(router)
|
||||
self.registered_routers.add_registered_router(router)
|
||||
|
||||
def include_routers(self, *routers: Router) -> None:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user