From 107530a4b1469872f38970dc94780049d1c27b79 Mon Sep 17 00:00:00 2001 From: kolo Date: Thu, 29 Jan 2026 02:22:48 +0300 Subject: [PATCH] benchs --- metrics/__main__.py | 2 +- mock/mock_app/main.py | 3 +- pyproject.toml | 2 +- src/argenta/app/models.py | 195 +++++++--------------- src/argenta/app/protocols.py | 20 ++- src/argenta/app/view_renderer/__init__.py | 0 src/argenta/app/view_renderer/entity.py | 132 +++++++++++++++ 7 files changed, 209 insertions(+), 145 deletions(-) create mode 100644 src/argenta/app/view_renderer/__init__.py create mode 100644 src/argenta/app/view_renderer/entity.py diff --git a/metrics/__main__.py b/metrics/__main__.py index 247a174..48a38af 100644 --- a/metrics/__main__.py +++ b/metrics/__main__.py @@ -2,7 +2,7 @@ from argenta import App, Orchestrator from .handlers import router -app = App(initial_message="metrics") +app = App(initial_message="metrics", override_system_messages=False) orchestrator = Orchestrator() diff --git a/mock/mock_app/main.py b/mock/mock_app/main.py index 745baf9..65a5623 100644 --- a/mock/mock_app/main.py +++ b/mock/mock_app/main.py @@ -6,8 +6,7 @@ from argenta.app.dividing_line.models import DynamicDividingLine from mock.mock_app.routers import work_router app: App = App( - dividing_line=StaticDividingLine('~'), - prompt=HTML('\n>>>') + dividing_line=StaticDividingLine('~') ) orchestrator: Orchestrator = Orchestrator() diff --git a/pyproject.toml b/pyproject.toml index f3e09d4..b179e94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ metrics = [ ] [tool.ruff] -line-length=90 +line-length=100 [tool.pyright] typeCheckingMode = "strict" diff --git a/src/argenta/app/models.py b/src/argenta/app/models.py index 14acad5..29842db 100644 --- a/src/argenta/app/models.py +++ b/src/argenta/app/models.py @@ -5,12 +5,10 @@ import re from contextlib import redirect_stdout from typing import Callable, Never, TypeAlias -from art import text2art -from prompt_toolkit import HTML from rich.console import Console -from rich.markup import escape from argenta.app.autocompleter import AutoCompleter +from argenta.app.view_renderer.entity import PlainRenderer, RichRenderer, ViewRenderer from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine from argenta.app.protocols import ( DescriptionMessageGenerator, @@ -30,7 +28,6 @@ from argenta.response import Response from argenta.router import Router Matches: TypeAlias = list[str] | list[Never] - _ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\u001b\[[0-9;]*m") @@ -38,7 +35,7 @@ class BaseApp: def __init__( self, *, - prompt: str | HTML, + prompt: str, initial_message: str, farewell_message: str, exit_command: Command, @@ -49,41 +46,38 @@ class BaseApp: autocompleter: AutoCompleter, print_func: Printer, ) -> None: - self._prompt: str | HTML = prompt + self._prompt: str = prompt self._print_func: Printer = print_func self._exit_command: Command = exit_command self._dividing_line: StaticDividingLine | DynamicDividingLine | None = dividing_line self._repeat_command_groups_printing: bool = repeat_command_groups_printing self._override_system_messages: bool = override_system_messages self._autocompleter: AutoCompleter = autocompleter - self.system_router: Router = Router(title=system_router_title) - - self._farewell_message: str = farewell_message - self._initial_message: str = initial_message + self._system_router: Router = Router(title=system_router_title) self._stdout_buffer: io.StringIO = io.StringIO() - - self._description_message_gen: DescriptionMessageGenerator = ( - lambda command, description: f"{command} *=*=* {description}" - ) self.registered_routers: RegisteredRouters = RegisteredRouters() self._messages_on_startup: list[str] = [] - self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = ( - lambda _: print_func(f"Incorrect flag syntax: {_}") - ) - self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = ( - lambda _: print_func(f"Repeated input flags: {_}") - ) - self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func( - "Empty input command" - ) - self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = ( - lambda _: print_func(f"Unknown command: {_.trigger}") - ) - self._exit_command_handler: NonStandardBehaviorHandler[Response] = ( - lambda _: print_func(self._farewell_message) - ) + if self._override_system_messages: + self.view: ViewRenderer = PlainRenderer( + print_func=self._print_func, + most_similar_command_getter=self._most_similar_command + ) + else: + self.view: ViewRenderer = RichRenderer( + print_func=self._print_func, + most_similar_command_getter=self._most_similar_command + ) + + self._initial_message: str = self.view.render_initial_message(initial_message) + self._farewell_message: str = self.view.render_farewell_message(farewell_message) + self._description_message_gen: DescriptionMessageGenerator = self.view.generate_formatted_description_message_gen() + self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = self.view.generate_formatted_incorrect_input_syntax_handler() + self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = self.view.generate_formatted_repeated_input_flags_handler() + self._empty_input_command_handler: EmptyCommandHandler = self.view.generate_formatted_empty_input_command_handler() + self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = self.view.generate_formatted_unknown_command_handler() + self._exit_command_handler: NonStandardBehaviorHandler[Response] = self.view.generate_formatted_exit_command_handler(self._farewell_message) def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None: """ @@ -93,9 +87,7 @@ class BaseApp: """ self._description_message_gen = _ - def set_incorrect_input_syntax_handler( - self, _: NonStandardBehaviorHandler[str], / - ) -> None: + def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None: """ Public. Sets the handler for incorrect flags when entering a command :param _: handler for incorrect flags when entering a command @@ -103,9 +95,7 @@ class BaseApp: """ self._incorrect_input_syntax_handler = _ - def set_repeated_input_flags_handler( - self, _: NonStandardBehaviorHandler[str], / - ) -> None: + def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None: """ Public. Sets the handler for repeated flags when entering a command :param _: handler for repeated flags when entering a command @@ -113,9 +103,7 @@ class BaseApp: """ self._repeated_input_flags_handler = _ - def set_unknown_command_handler( - self, _: NonStandardBehaviorHandler[InputCommand], / - ) -> None: + def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None: """ Public. Sets the handler for unknown commands when entering a command :param _: handler for unknown commands when entering a command @@ -131,9 +119,7 @@ class BaseApp: """ self._empty_input_command_handler = _ - def set_exit_command_handler( - self, _: NonStandardBehaviorHandler[Response], / - ) -> None: + def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None: """ Public. Sets the handler for exit command when entering a command :param _: handler for exit command when entering a command @@ -147,7 +133,7 @@ class BaseApp: :return: None """ for registered_router in self.registered_routers: - self._print_func('\n'+registered_router.title) + self._print_func("\n" + registered_router.title) for command_handler in registered_router.command_handlers: handled_command = command_handler.handled_command self._print_func( @@ -165,17 +151,9 @@ class BaseApp: """ match self._dividing_line: case StaticDividingLine() as dividing_line: - self._print_func( - dividing_line.get_full_static_line( - is_override=self._override_system_messages - ) - ) + self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages)) print(text.strip("\n")) - self._print_func( - dividing_line.get_full_static_line( - is_override=self._override_system_messages - ) - ) + self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages)) case DynamicDividingLine() as dividing_line: self._print_func( StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line( @@ -189,9 +167,9 @@ class BaseApp: ) ) case None: - print('\n' + text.strip("\n") + '\n') + print("\n" + text.strip("\n") + "\n") case _: - raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented') + raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented") def _is_exit_command(self, command: InputCommand) -> bool: """ @@ -206,7 +184,7 @@ class BaseApp: elif trigger.lower() in [x.lower() for x in self._exit_command.aliases]: return True return False - + def _is_unknown_command(self, input_command: InputCommand) -> bool: if not self.registered_routers.get_router_by_trigger(input_command.trigger.lower()): return True @@ -243,33 +221,32 @@ class BaseApp: Private. Sets up system router :return: None """ - - @self.system_router.command(self._exit_command) + @self._system_router.command(self._exit_command) def _(response: Response) -> None: self._exit_command_handler(response) - self.registered_routers.add_registered_router(self.system_router) - + self.registered_routers.add_registered_router(self._system_router) + def _validate_routers_for_collisions(self) -> None: """ Private. Validates that there are no trigger/alias collisions between routers :return: None :raises: RepeatedTriggerNameException or RepeatedAliasNameException if collision detected """ - + all_triggers: set[str] = set() all_aliases: set[str] = set() - + for router_entity in self.registered_routers: union_units: set[str] = all_triggers | all_aliases trigger_collisions: set[str] = union_units & router_entity.triggers if trigger_collisions: raise RepeatedTriggerNameException() - + alias_collisions: set[str] = union_units & router_entity.aliases if alias_collisions: raise RepeatedAliasNameException(alias_collisions) - + all_triggers.update(router_entity.triggers) all_aliases.update(router_entity.aliases) @@ -279,9 +256,7 @@ class BaseApp: matches_startswith_unknown_command: Matches = sorted( cmd for cmd in all_commands if cmd.startswith(unknown_command) ) - matches_startswith_cmd: Matches = sorted( - cmd for cmd in all_commands if unknown_command.startswith(cmd) - ) + matches_startswith_cmd: Matches = sorted(cmd for cmd in all_commands if unknown_command.startswith(cmd)) matches: Matches = matches_startswith_unknown_command or matches_startswith_cmd @@ -292,52 +267,6 @@ class BaseApp: else: return None - def _setup_default_view(self) -> None: - """ - Private. Sets up default app view - :return: None - """ - if isinstance(self._prompt, str): - self._prompt = f"\n{self._prompt}" - self._initial_message = ( - "\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" - ) - self._farewell_message = ( - "[bold red]\n\n" - + str(text2art(self._farewell_message, font="chanky")) # pyright: ignore[reportUnknownArgumentType] - + "\n[/bold red]\n" - + "[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n" - ) - self._description_message_gen = lambda command, description: ( - f"[bold red]{escape('[' + command + ']')}[/bold red] " - f"[blue dim]*=*=*[/blue dim] " - f"[bold yellow italic]{escape(description)}" - ) - self._incorrect_input_syntax_handler = lambda raw_command: self._print_func( - f"[red bold]Incorrect flag syntax: {escape(raw_command)}" - ) - self._repeated_input_flags_handler = lambda raw_command: self._print_func( - f"[red bold]Repeated input flags: {escape(raw_command)}" - ) - self._empty_input_command_handler = lambda: self._print_func( - "[red bold]Empty input command" - ) - - def unknown_command_handler(command: InputCommand) -> None: - cmd_trg: str = command.trigger - mst_sim_cmd: str | None = self._most_similar_command(cmd_trg) - first_part_of_text = ( - f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]" - ) - second_part_of_text = ( - ("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]")) - if mst_sim_cmd - else "" - ) - self._print_func(first_part_of_text + second_part_of_text) - - self._unknown_command_handler = unknown_command_handler - def _pre_cycle_setup(self) -> None: """ Private. Configures various aspects of the application before the start of the cycle @@ -347,31 +276,25 @@ class BaseApp: self._validate_routers_for_collisions() self._autocompleter.initial_setup(self.registered_routers.get_triggers()) - self._print_func(self._initial_message) - self._print_func('\n'.join(self._messages_on_startup)) + if self._messages_on_startup: + self._print_func() if not self._repeat_command_groups_printing: self._print_command_group_description() def _process_exist_and_valid_command(self, input_command: InputCommand) -> None: processing_router = self.registered_routers.get_router_by_trigger(input_command.trigger.lower()) - + if not processing_router: raise RuntimeError(f"Router for '{input_command.trigger}' not found. Panic!") match (self._dividing_line, processing_router.disable_redirect_stdout): case (DynamicDividingLine(), False): - stdout_result = self._capture_stdout( - lambda: processing_router.finds_appropriate_handler(input_command) - ) + stdout_result = self._capture_stdout(lambda: processing_router.finds_appropriate_handler(input_command)) clear_text = _ANSI_ESCAPE_RE.sub("", stdout_result) max_length_line = max([len(line) for line in clear_text.split("\n")]) max_length_line = ( - max_length_line - if 10 <= max_length_line <= 100 - else 100 - if max_length_line > 100 - else 10 + max_length_line if 10 <= max_length_line <= 100 else 100 if max_length_line > 100 else 10 ) self._print_func( @@ -400,7 +323,7 @@ class BaseApp: case (None, bool()): processing_router.finds_appropriate_handler(input_command) case _: - raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented') + raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented") AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine @@ -412,9 +335,9 @@ class App(BaseApp): def __init__( self, *, - prompt: str | HTML = ">>> ", - initial_message: str = "Argenta\n", - farewell_message: str = "\nSee you\n", + prompt: str = ">>> ", + initial_message: str = "Argenta", + farewell_message: str = "See you", exit_command: Command = DEFAULT_EXIT_COMMAND, system_router_title: str = "System points:", dividing_line: AVAILABLE_DIVIDING_LINES | None = None, @@ -450,39 +373,37 @@ class App(BaseApp): autocompleter=autocompleter or AutoCompleter(), print_func=print_func, ) - if not self._override_system_messages: - self._setup_default_view() def run_polling(self) -> None: """ Private. Starts the user input processing cycle :return: None """ + self._print_func(self._initial_message) self._pre_cycle_setup() while True: if self._repeat_command_groups_printing: self._print_command_group_description() - raw_command: str = self._autocompleter.prompt(self._prompt) - print() # post-prompt gap + print() # pre-prompt gap + raw_command: str = self._autocompleter.prompt(self.view.render_prompt(self._prompt)) + print() # post-prompt gap try: input_command: InputCommand = InputCommand.parse(raw_command=raw_command) - except InputCommandException as error: # noqa F841 + except InputCommandException as error: # noqa F841 stderr_result = self._capture_stdout( - lambda: self._error_handler(error, raw_command) # noqa F821 + lambda: self._error_handler(error, raw_command) # noqa F821 ) self._print_static_framed_text(stderr_result) continue if self._is_exit_command(input_command): - self.system_router.finds_appropriate_handler(input_command) + self._system_router.finds_appropriate_handler(input_command) return if self._is_unknown_command(input_command): - stdout_res = self._capture_stdout( - lambda: self._unknown_command_handler(input_command) - ) + stdout_res = self._capture_stdout(lambda: self._unknown_command_handler(input_command)) self._print_static_framed_text(stdout_res) continue diff --git a/src/argenta/app/protocols.py b/src/argenta/app/protocols.py index d68614d..985c8e0 100644 --- a/src/argenta/app/protocols.py +++ b/src/argenta/app/protocols.py @@ -1,10 +1,17 @@ -__all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "DescriptionMessageGenerator", "HandlerFunc"] +__all__ = [ + "NonStandardBehaviorHandler", + "EmptyCommandHandler", + "MostSimilarCommandGetter", + "Printer", + "DescriptionMessageGenerator", + "HandlerFunc", +] + +from typing import Any, Protocol, TypeVar -from typing import Any, ParamSpec, Protocol, TypeVar from argenta.response import Response -T = TypeVar("T", contravariant=True) -P = ParamSpec("P") +T = TypeVar("T", contravariant=True) class NonStandardBehaviorHandler(Protocol[T]): @@ -22,6 +29,11 @@ class Printer(Protocol): raise NotImplementedError +class MostSimilarCommandGetter(Protocol): + def __call__(self, _unknown_trigger: str, /) -> str | None: + raise NotImplementedError + + class DescriptionMessageGenerator(Protocol): def __call__(self, _command: str, _description: str, /) -> str: raise NotImplementedError diff --git a/src/argenta/app/view_renderer/__init__.py b/src/argenta/app/view_renderer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/argenta/app/view_renderer/entity.py b/src/argenta/app/view_renderer/entity.py new file mode 100644 index 0000000..5633186 --- /dev/null +++ b/src/argenta/app/view_renderer/entity.py @@ -0,0 +1,132 @@ +from typing import Protocol, Iterable + +from art import text2art +from rich.markup import escape + +from argenta.response.entity import Response +from argenta.command.models import InputCommand +from argenta.app.protocols import ( + DescriptionMessageGenerator, + EmptyCommandHandler, + MostSimilarCommandGetter, + NonStandardBehaviorHandler, + Printer, +) + + +class ConcatenateRenderer: + def render_concatenated_messages_on_startup(self, messages: Iterable[str]) -> str: + return "\n".join(messages) + + +class ViewRenderer(Protocol): + def render_prompt(self, text: str) -> str: ... + + def render_initial_message(self, text: str) -> str: ... + + def render_farewell_message(self, text: str) -> str: ... + + def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator: ... + + def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]: ... + + def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]: ... + + def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler: ... + + def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]: ... + + def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]: ... + + +class RichRenderer(ViewRenderer): + def __init__(self, print_func: Printer, most_similar_command_getter: MostSimilarCommandGetter) -> None: + self._print_func = print_func + self._most_similar_command_getter = most_similar_command_getter + + def render_prompt(self, text: str) -> str: + return f"{text}" + + def render_initial_message(self, text: str) -> str: + return f"[bold red]{text2art(text, font='tarty1')}[/bold red]" + + def render_farewell_message(self, text: str) -> str: + return ( + "[bold red]" + + str(text2art(text, font="chanky")) + + "[/bold red]\n" + + "[red i]https://github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]" + ) + + def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator: + return lambda command, description: ( + f"[bold red]{escape('[' + command + ']')}[/bold red] " + f"[blue dim]*=*=*[/blue dim] " + f"[bold yellow italic]{escape(description)}[/bold yellow italic]" + ) + + def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]: + return lambda raw_command: self._print_func(f"[red bold]Incorrect flag syntax: {escape(raw_command)}[/red bold]") + + def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]: + return lambda raw_command: self._print_func(f"[red bold]Repeated input flags: {escape(raw_command)}[/red bold]") + + def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler: + return lambda: self._print_func("[red bold]Empty input command[/red bold]") + + def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]: + def unknown_command_handler(command: InputCommand) -> None: + cmd_trg: str = command.trigger + mst_sim_cmd: str | None = self._most_similar_command_getter(cmd_trg) + self._print_func( + f"[red]Unknown command:[/red][blue]{escape(cmd_trg)}[/blue][red]" + + (f", most similar:[/red][blue]{mst_sim_cmd}[/blue]" + if mst_sim_cmd + else "") + ) + + return unknown_command_handler + + def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]: + return lambda _: self._print_func(farewell_message) + + +class PlainRenderer(ViewRenderer): + def __init__(self, print_func: Printer, most_similar_command_getter: MostSimilarCommandGetter) -> None: + self._print_func = print_func + self._most_similar_command_getter = most_similar_command_getter + + def render_prompt(self, text: str) -> str: + return text + + def render_initial_message(self, text: str) -> str: + return text + + def render_farewell_message(self, text: str) -> str: + return f"{text} | https://github.com/koloideal/Argenta | made by kolo" + + def generate_formatted_description_message_gen(self) -> DescriptionMessageGenerator: + return lambda command, description: f"{command} *=*=* {description}" + + def generate_formatted_incorrect_input_syntax_handler(self) -> NonStandardBehaviorHandler[str]: + return lambda raw_command: self._print_func(f"Incorrect flag syntax: {escape(raw_command)}") + + def generate_formatted_repeated_input_flags_handler(self) -> NonStandardBehaviorHandler[str]: + return lambda raw_command: self._print_func(f"Repeated input flags: {escape(raw_command)}") + + def generate_formatted_empty_input_command_handler(self) -> EmptyCommandHandler: + return lambda: self._print_func("Empty input command") + + def generate_formatted_unknown_command_handler(self) -> NonStandardBehaviorHandler[InputCommand]: + def unknown_command_handler(command: InputCommand) -> None: + cmd_trg: str = command.trigger + mst_sim_cmd: str | None = self._most_similar_command_getter(cmd_trg) + self._print_func( + f"Unknown command: {escape(cmd_trg)}" + + (f", most similar:{mst_sim_cmd}" if mst_sim_cmd else "") + ) + + return unknown_command_handler + + def generate_formatted_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]: + return lambda _: self._print_func(farewell_message)