This commit is contained in:
2026-01-29 02:22:48 +03:00
parent 9aa99352fe
commit 107530a4b1
7 changed files with 209 additions and 145 deletions
+1 -1
View File
@@ -2,7 +2,7 @@ from argenta import App, Orchestrator
from .handlers import router
app = App(initial_message="metrics")
app = App(initial_message="metrics", override_system_messages=False)
orchestrator = Orchestrator()
+1 -2
View File
@@ -6,8 +6,7 @@ from argenta.app.dividing_line.models import DynamicDividingLine
from mock.mock_app.routers import work_router
app: App = App(
dividing_line=StaticDividingLine('~'),
prompt=HTML('\n<blink>>>></blink>')
dividing_line=StaticDividingLine('~')
)
orchestrator: Orchestrator = Orchestrator()
+1 -1
View File
@@ -42,7 +42,7 @@ metrics = [
]
[tool.ruff]
line-length=90
line-length=100
[tool.pyright]
typeCheckingMode = "strict"
+51 -130
View File
@@ -5,12 +5,10 @@ import re
from contextlib import redirect_stdout
from typing import Callable, Never, TypeAlias
from art import text2art
from prompt_toolkit import HTML
from rich.console import Console
from rich.markup import escape
from argenta.app.autocompleter import AutoCompleter
from argenta.app.view_renderer.entity import PlainRenderer, RichRenderer, ViewRenderer
from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine
from argenta.app.protocols import (
DescriptionMessageGenerator,
@@ -30,7 +28,6 @@ from argenta.response import Response
from argenta.router import Router
Matches: TypeAlias = list[str] | list[Never]
_ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\u001b\[[0-9;]*m")
@@ -38,7 +35,7 @@ class BaseApp:
def __init__(
self,
*,
prompt: str | HTML,
prompt: str,
initial_message: str,
farewell_message: str,
exit_command: Command,
@@ -49,41 +46,38 @@ class BaseApp:
autocompleter: AutoCompleter,
print_func: Printer,
) -> None:
self._prompt: str | HTML = prompt
self._prompt: str = prompt
self._print_func: Printer = print_func
self._exit_command: Command = exit_command
self._dividing_line: StaticDividingLine | DynamicDividingLine | None = dividing_line
self._repeat_command_groups_printing: bool = repeat_command_groups_printing
self._override_system_messages: bool = override_system_messages
self._autocompleter: AutoCompleter = autocompleter
self.system_router: Router = Router(title=system_router_title)
self._farewell_message: str = farewell_message
self._initial_message: str = initial_message
self._system_router: Router = Router(title=system_router_title)
self._stdout_buffer: io.StringIO = io.StringIO()
self._description_message_gen: DescriptionMessageGenerator = (
lambda command, description: f"{command} *=*=* {description}"
)
self.registered_routers: RegisteredRouters = RegisteredRouters()
self._messages_on_startup: list[str] = []
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = (
lambda _: print_func(f"Incorrect flag syntax: {_}")
)
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = (
lambda _: print_func(f"Repeated input flags: {_}")
)
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func(
"Empty input command"
)
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = (
lambda _: print_func(f"Unknown command: {_.trigger}")
)
self._exit_command_handler: NonStandardBehaviorHandler[Response] = (
lambda _: print_func(self._farewell_message)
)
if self._override_system_messages:
self.view: ViewRenderer = PlainRenderer(
print_func=self._print_func,
most_similar_command_getter=self._most_similar_command
)
else:
self.view: ViewRenderer = RichRenderer(
print_func=self._print_func,
most_similar_command_getter=self._most_similar_command
)
self._initial_message: str = self.view.render_initial_message(initial_message)
self._farewell_message: str = self.view.render_farewell_message(farewell_message)
self._description_message_gen: DescriptionMessageGenerator = self.view.generate_formatted_description_message_gen()
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = self.view.generate_formatted_incorrect_input_syntax_handler()
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = self.view.generate_formatted_repeated_input_flags_handler()
self._empty_input_command_handler: EmptyCommandHandler = self.view.generate_formatted_empty_input_command_handler()
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = self.view.generate_formatted_unknown_command_handler()
self._exit_command_handler: NonStandardBehaviorHandler[Response] = self.view.generate_formatted_exit_command_handler(self._farewell_message)
def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None:
"""
@@ -93,9 +87,7 @@ class BaseApp:
"""
self._description_message_gen = _
def set_incorrect_input_syntax_handler(
self, _: NonStandardBehaviorHandler[str], /
) -> None:
def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for incorrect flags when entering a command
:param _: handler for incorrect flags when entering a command
@@ -103,9 +95,7 @@ class BaseApp:
"""
self._incorrect_input_syntax_handler = _
def set_repeated_input_flags_handler(
self, _: NonStandardBehaviorHandler[str], /
) -> None:
def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for repeated flags when entering a command
:param _: handler for repeated flags when entering a command
@@ -113,9 +103,7 @@ class BaseApp:
"""
self._repeated_input_flags_handler = _
def set_unknown_command_handler(
self, _: NonStandardBehaviorHandler[InputCommand], /
) -> None:
def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None:
"""
Public. Sets the handler for unknown commands when entering a command
:param _: handler for unknown commands when entering a command
@@ -131,9 +119,7 @@ class BaseApp:
"""
self._empty_input_command_handler = _
def set_exit_command_handler(
self, _: NonStandardBehaviorHandler[Response], /
) -> None:
def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None:
"""
Public. Sets the handler for exit command when entering a command
:param _: handler for exit command when entering a command
@@ -147,7 +133,7 @@ class BaseApp:
:return: None
"""
for registered_router in self.registered_routers:
self._print_func('\n'+registered_router.title)
self._print_func("\n" + registered_router.title)
for command_handler in registered_router.command_handlers:
handled_command = command_handler.handled_command
self._print_func(
@@ -165,17 +151,9 @@ class BaseApp:
"""
match self._dividing_line:
case StaticDividingLine() as dividing_line:
self._print_func(
dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
)
self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages))
print(text.strip("\n"))
self._print_func(
dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
)
self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages))
case DynamicDividingLine() as dividing_line:
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
@@ -189,9 +167,9 @@ class BaseApp:
)
)
case None:
print('\n' + text.strip("\n") + '\n')
print("\n" + text.strip("\n") + "\n")
case _:
raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented')
raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented")
def _is_exit_command(self, command: InputCommand) -> bool:
"""
@@ -243,12 +221,11 @@ class BaseApp:
Private. Sets up system router
:return: None
"""
@self.system_router.command(self._exit_command)
@self._system_router.command(self._exit_command)
def _(response: Response) -> None:
self._exit_command_handler(response)
self.registered_routers.add_registered_router(self.system_router)
self.registered_routers.add_registered_router(self._system_router)
def _validate_routers_for_collisions(self) -> None:
"""
@@ -279,9 +256,7 @@ class BaseApp:
matches_startswith_unknown_command: Matches = sorted(
cmd for cmd in all_commands if cmd.startswith(unknown_command)
)
matches_startswith_cmd: Matches = sorted(
cmd for cmd in all_commands if unknown_command.startswith(cmd)
)
matches_startswith_cmd: Matches = sorted(cmd for cmd in all_commands if unknown_command.startswith(cmd))
matches: Matches = matches_startswith_unknown_command or matches_startswith_cmd
@@ -292,52 +267,6 @@ class BaseApp:
else:
return None
def _setup_default_view(self) -> None:
"""
Private. Sets up default app view
:return: None
"""
if isinstance(self._prompt, str):
self._prompt = f"\n<gray><b>{self._prompt}</b></gray>"
self._initial_message = (
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}"
)
self._farewell_message = (
"[bold red]\n\n"
+ str(text2art(self._farewell_message, font="chanky")) # pyright: ignore[reportUnknownArgumentType]
+ "\n[/bold red]\n"
+ "[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n"
)
self._description_message_gen = lambda command, description: (
f"[bold red]{escape('[' + command + ']')}[/bold red] "
f"[blue dim]*=*=*[/blue dim] "
f"[bold yellow italic]{escape(description)}"
)
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(
f"[red bold]Incorrect flag syntax: {escape(raw_command)}"
)
self._repeated_input_flags_handler = lambda raw_command: self._print_func(
f"[red bold]Repeated input flags: {escape(raw_command)}"
)
self._empty_input_command_handler = lambda: self._print_func(
"[red bold]Empty input command"
)
def unknown_command_handler(command: InputCommand) -> None:
cmd_trg: str = command.trigger
mst_sim_cmd: str | None = self._most_similar_command(cmd_trg)
first_part_of_text = (
f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
)
second_part_of_text = (
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]"))
if mst_sim_cmd
else ""
)
self._print_func(first_part_of_text + second_part_of_text)
self._unknown_command_handler = unknown_command_handler
def _pre_cycle_setup(self) -> None:
"""
Private. Configures various aspects of the application before the start of the cycle
@@ -347,8 +276,8 @@ class BaseApp:
self._validate_routers_for_collisions()
self._autocompleter.initial_setup(self.registered_routers.get_triggers())
self._print_func(self._initial_message)
self._print_func('\n'.join(self._messages_on_startup))
if self._messages_on_startup:
self._print_func()
if not self._repeat_command_groups_printing:
self._print_command_group_description()
@@ -361,17 +290,11 @@ class BaseApp:
match (self._dividing_line, processing_router.disable_redirect_stdout):
case (DynamicDividingLine(), False):
stdout_result = self._capture_stdout(
lambda: processing_router.finds_appropriate_handler(input_command)
)
stdout_result = self._capture_stdout(lambda: processing_router.finds_appropriate_handler(input_command))
clear_text = _ANSI_ESCAPE_RE.sub("", stdout_result)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 100
else 100
if max_length_line > 100
else 10
max_length_line if 10 <= max_length_line <= 100 else 100 if max_length_line > 100 else 10
)
self._print_func(
@@ -400,7 +323,7 @@ class BaseApp:
case (None, bool()):
processing_router.finds_appropriate_handler(input_command)
case _:
raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented')
raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented")
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine
@@ -412,9 +335,9 @@ class App(BaseApp):
def __init__(
self,
*,
prompt: str | HTML = ">>> ",
initial_message: str = "Argenta\n",
farewell_message: str = "\nSee you\n",
prompt: str = ">>> ",
initial_message: str = "Argenta",
farewell_message: str = "See you",
exit_command: Command = DEFAULT_EXIT_COMMAND,
system_router_title: str = "System points:",
dividing_line: AVAILABLE_DIVIDING_LINES | None = None,
@@ -450,39 +373,37 @@ class App(BaseApp):
autocompleter=autocompleter or AutoCompleter(),
print_func=print_func,
)
if not self._override_system_messages:
self._setup_default_view()
def run_polling(self) -> None:
"""
Private. Starts the user input processing cycle
:return: None
"""
self._print_func(self._initial_message)
self._pre_cycle_setup()
while True:
if self._repeat_command_groups_printing:
self._print_command_group_description()
raw_command: str = self._autocompleter.prompt(self._prompt)
print() # post-prompt gap
print() # pre-prompt gap
raw_command: str = self._autocompleter.prompt(self.view.render_prompt(self._prompt))
print() # post-prompt gap
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except InputCommandException as error: # noqa F841
except InputCommandException as error: # noqa F841
stderr_result = self._capture_stdout(
lambda: self._error_handler(error, raw_command) # noqa F821
lambda: self._error_handler(error, raw_command) # noqa F821
)
self._print_static_framed_text(stderr_result)
continue
if self._is_exit_command(input_command):
self.system_router.finds_appropriate_handler(input_command)
self._system_router.finds_appropriate_handler(input_command)
return
if self._is_unknown_command(input_command):
stdout_res = self._capture_stdout(
lambda: self._unknown_command_handler(input_command)
)
stdout_res = self._capture_stdout(lambda: self._unknown_command_handler(input_command))
self._print_static_framed_text(stdout_res)
continue
+15 -3
View File
@@ -1,10 +1,17 @@
__all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "DescriptionMessageGenerator", "HandlerFunc"]
__all__ = [
"NonStandardBehaviorHandler",
"EmptyCommandHandler",
"MostSimilarCommandGetter",
"Printer",
"DescriptionMessageGenerator",
"HandlerFunc",
]
from typing import Any, Protocol, TypeVar
from typing import Any, ParamSpec, Protocol, TypeVar
from argenta.response import Response
T = TypeVar("T", contravariant=True)
P = ParamSpec("P")
class NonStandardBehaviorHandler(Protocol[T]):
@@ -22,6 +29,11 @@ class Printer(Protocol):
raise NotImplementedError
class MostSimilarCommandGetter(Protocol):
def __call__(self, _unknown_trigger: str, /) -> str | None:
raise NotImplementedError
class DescriptionMessageGenerator(Protocol):
def __call__(self, _command: str, _description: str, /) -> str:
raise NotImplementedError
+132
View File
@@ -0,0 +1,132 @@
from typing import Protocol, Iterable
from art import text2art
from rich.markup import escape
from argenta.response.entity import Response
from argenta.command.models import InputCommand
from argenta.app.protocols import (
DescriptionMessageGenerator,
EmptyCommandHandler,
MostSimilarCommandGetter,
NonStandardBehaviorHandler,
Printer,
)
class ConcatenateRenderer:
def render_concatenated_messages_on_startup(self, messages: Iterable[str]) -> str:
return "\n".join(messages)
class ViewRenderer(Protocol):
def render_prompt(self, text: str) -> str: ...
def render_initial_message(self, text: str) -> str: ...
def render_farewell_message(self, text: str) -> str: ...
def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator: ...
def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]: ...
def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]: ...
def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler: ...
def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]: ...
def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]: ...
class RichRenderer(ViewRenderer):
def __init__(self, print_func: Printer, most_similar_command_getter: MostSimilarCommandGetter) -> None:
self._print_func = print_func
self._most_similar_command_getter = most_similar_command_getter
def render_prompt(self, text: str) -> str:
return f"<gray><b>{text}</b></gray>"
def render_initial_message(self, text: str) -> str:
return f"[bold red]{text2art(text, font='tarty1')}[/bold red]"
def render_farewell_message(self, text: str) -> str:
return (
"[bold red]"
+ str(text2art(text, font="chanky"))
+ "[/bold red]\n"
+ "[red i]https://github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]"
)
def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator:
return lambda command, description: (
f"[bold red]{escape('[' + command + ']')}[/bold red] "
f"[blue dim]*=*=*[/blue dim] "
f"[bold yellow italic]{escape(description)}[/bold yellow italic]"
)
def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]:
return lambda raw_command: self._print_func(f"[red bold]Incorrect flag syntax: {escape(raw_command)}[/red bold]")
def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]:
return lambda raw_command: self._print_func(f"[red bold]Repeated input flags: {escape(raw_command)}[/red bold]")
def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler:
return lambda: self._print_func("[red bold]Empty input command[/red bold]")
def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]:
def unknown_command_handler(command: InputCommand) -> None:
cmd_trg: str = command.trigger
mst_sim_cmd: str | None = self._most_similar_command_getter(cmd_trg)
self._print_func(
f"[red]Unknown command:[/red][blue]{escape(cmd_trg)}[/blue][red]" +
(f", most similar:[/red][blue]{mst_sim_cmd}[/blue]"
if mst_sim_cmd
else "")
)
return unknown_command_handler
def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]:
return lambda _: self._print_func(farewell_message)
class PlainRenderer(ViewRenderer):
def __init__(self, print_func: Printer, most_similar_command_getter: MostSimilarCommandGetter) -> None:
self._print_func = print_func
self._most_similar_command_getter = most_similar_command_getter
def render_prompt(self, text: str) -> str:
return text
def render_initial_message(self, text: str) -> str:
return text
def render_farewell_message(self, text: str) -> str:
return f"{text} | https://github.com/koloideal/Argenta | made by kolo"
def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator:
return lambda command, description: f"{command} *=*=* {description}"
def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]:
return lambda raw_command: self._print_func(f"Incorrect flag syntax: {escape(raw_command)}")
def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]:
return lambda raw_command: self._print_func(f"Repeated input flags: {escape(raw_command)}")
def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler:
return lambda: self._print_func("Empty input command")
def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]:
def unknown_command_handler(command: InputCommand) -> None:
cmd_trg: str = command.trigger
mst_sim_cmd: str | None = self._most_similar_command_getter(cmd_trg)
self._print_func(
f"Unknown command: {escape(cmd_trg)}"
+ (f", most similar:{mst_sim_cmd}" if mst_sim_cmd else "")
)
return unknown_command_handler
def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]:
return lambda _: self._print_func(farewell_message)