From 56189be6abe1e44b4ef21cd0e7b76092860cf4c9 Mon Sep 17 00:00:00 2001 From: kolo Date: Tue, 9 Dec 2025 12:02:26 +0300 Subject: [PATCH] better perf --- mock/local_test.py | 14 ++- src/argenta/app/autocompleter/entity.py | 16 +--- src/argenta/app/models.py | 83 +++-------------- src/argenta/app/registered_routers/entity.py | 17 +++- src/argenta/router/entity.py | 2 +- tests/unit_tests/test_autocompleter.py | 25 +---- tests/unit_tests/test_router.py | 96 +++++++++++++++++++- 7 files changed, 138 insertions(+), 115 deletions(-) diff --git a/mock/local_test.py b/mock/local_test.py index 0aaef4e..c34d3b5 100644 --- a/mock/local_test.py +++ b/mock/local_test.py @@ -1,10 +1,14 @@ -from argenta import Command, Response, Router +from argenta import Command, Response, Router, App, Orchestrator from argenta.command import InputCommand router = Router() +orchestrator = Orchestrator() -@router.command(Command('heLLo')) -def handler(_res: Response) -> None: - print("Hello World!") +@router.command(Command('test')) +def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction] + print('test command') -router.finds_appropriate_handler(InputCommand('HellO')) +app = App(override_system_messages=True, print_func=print) +app.include_router(router) +app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}')) +orchestrator.start_polling(app) \ No newline at end of file diff --git a/src/argenta/app/autocompleter/entity.py b/src/argenta/app/autocompleter/entity.py index 22aa6e9..a50bad0 100644 --- a/src/argenta/app/autocompleter/entity.py +++ b/src/argenta/app/autocompleter/entity.py @@ -48,7 +48,7 @@ class AutoCompleter: else: return None - def initial_setup(self, all_commands: list[str]) -> None: + def initial_setup(self, all_commands: set[str]) -> None: """ Private. Initial setup function :param all_commands: Registered commands for adding them to the autocomplete history @@ -69,7 +69,7 @@ class AutoCompleter: readline.set_completer_delims(readline.get_completer_delims().replace(" ", "")) readline.parse_and_bind(f"{self.autocomplete_button}: complete") - def exit_setup(self, all_commands: list[str], ignore_command_register: bool) -> None: + def exit_setup(self, all_commands: set[str]) -> None: """ Private. Exit setup function :return: None @@ -80,22 +80,12 @@ class AutoCompleter: raw_history = history_file.read() pretty_history: list[str] = [] for line in set(raw_history.strip().split("\n")): - if _is_command_exist( - line.split()[0], all_commands, ignore_command_register - ): + if line.split()[0] in all_commands: pretty_history.append(line) with open(self.history_filename, "w") as history_file: _ = history_file.write("\n".join(pretty_history)) -def _is_command_exist( - command: str, existing_commands: list[str], ignore_command_register: bool -) -> bool: - if ignore_command_register: - return command.lower() in existing_commands - return command in existing_commands - - def _get_history_items() -> list[str] | list[Never]: """ Private. Returns a list of all commands entered by the user diff --git a/src/argenta/app/models.py b/src/argenta/app/models.py index a4d4214..b82019b 100644 --- a/src/argenta/app/models.py +++ b/src/argenta/app/models.py @@ -40,7 +40,6 @@ class BaseApp: farewell_message: str, exit_command: Command, system_router_title: str, - ignore_command_register: bool, dividing_line: StaticDividingLine | DynamicDividingLine, repeat_command_groups_printing: bool, override_system_messages: bool, @@ -51,8 +50,7 @@ class BaseApp: self._print_func: Printer = print_func self._exit_command: Command = exit_command self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line - self._ignore_command_register: bool = ignore_command_register - self._repeat_command_groups_printing_description: bool = repeat_command_groups_printing + self._repeat_command_groups_printing: bool = repeat_command_groups_printing self._override_system_messages: bool = override_system_messages self._autocompleter: AutoCompleter = autocompleter self.system_router: Router = Router(title=system_router_title) @@ -66,15 +64,6 @@ class BaseApp: self.registered_routers: RegisteredRouters = RegisteredRouters() self._messages_on_startup: list[str] = [] - self._matching_lower_triggers_with_routers: dict[str, Router] = {} - self._matching_default_triggers_with_routers: dict[str, Router] = {} - - self._current_matching_triggers_with_routers: dict[str, Router] = ( - self._matching_lower_triggers_with_routers - if self._ignore_command_register - else self._matching_default_triggers_with_routers - ) - self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = ( lambda _: print_func(f"Incorrect flag syntax: {_}") ) @@ -217,37 +206,12 @@ class BaseApp: """ trigger = command.trigger exit_trigger = self._exit_command.trigger - if self._ignore_command_register: - if trigger.lower() == exit_trigger.lower(): - return True - elif trigger.lower() in [x.lower() for x in self._exit_command.aliases]: - return True - else: - if trigger == exit_trigger: - return True - elif trigger in self._exit_command.aliases: - return True + if trigger.lower() == exit_trigger.lower(): + return True + elif trigger.lower() in [x.lower() for x in self._exit_command.aliases]: + return True return False - def _is_unknown_command(self, command: InputCommand) -> bool: - """ - Private. Checks if the given command is an unknown command - :param command: command to check - :return: is it an unknown command or not as bool - """ - input_command_trigger = command.trigger - if self._ignore_command_register: - if input_command_trigger.lower() in list( - self._current_matching_triggers_with_routers.keys() - ): - return False - else: - if input_command_trigger in list( - self._current_matching_triggers_with_routers.keys() - ): - return False - return True - def _error_handler(self, error: InputCommandException, raw_command: str) -> None: """ Private. Handles parsing errors of the entered command @@ -297,7 +261,7 @@ class BaseApp: all_aliases.update(router_entity.aliases) def _most_similar_command(self, unknown_command: str) -> str | None: - all_commands = list(self._current_matching_triggers_with_routers.keys()) + all_commands = self.registered_routers.get_triggers() matches_startswith_unknown_command: Matches = sorted( cmd for cmd in all_commands if cmd.startswith(unknown_command) @@ -368,20 +332,7 @@ class BaseApp: self._setup_system_router() self._validate_routers_for_collisions() - for router_entity in self.registered_routers: - router_triggers = router_entity.triggers - router_aliases = router_entity.aliases - combined = router_triggers | router_aliases - - for trigger in combined: - self._matching_default_triggers_with_routers[trigger] = router_entity - self._matching_lower_triggers_with_routers[trigger.lower()] = ( - router_entity - ) - - self._autocompleter.initial_setup( - list(self._current_matching_triggers_with_routers.keys()) - ) + self._autocompleter.initial_setup(self.registered_routers.get_triggers()) if not self._override_system_messages: self._setup_default_view() @@ -392,13 +343,14 @@ class BaseApp: self._print_func(message) if self._messages_on_startup: print("\n") - if not self._repeat_command_groups_printing_description: + if not self._repeat_command_groups_printing: self._print_command_group_description() def _process_exist_and_valid_command(self, input_command: InputCommand) -> None: - processing_router = self._current_matching_triggers_with_routers[ - input_command.trigger.lower() - ] + processing_router = self.registered_routers.get_router_by_trigger(input_command.trigger.lower()) + + if not processing_router: + raise RuntimeError(f"Router for '{input_command.trigger}' not found. Panic!") if processing_router.disable_redirect_stdout: dividing_line_unit_part: str = self._dividing_line.get_unit_part() @@ -452,7 +404,6 @@ class App(BaseApp): :param farewell_message: displayed at the end of the app :param exit_command: the entity of the command that will be terminated when entered :param system_router_title: system router title - :param ignore_command_register: whether to ignore the case of the entered commands :param dividing_line: the entity of the dividing line :param repeat_command_groups_printing: whether to repeat the available commands and their description :param override_system_messages: whether to redefine the default formatting of system messages @@ -466,7 +417,6 @@ class App(BaseApp): farewell_message=farewell_message, exit_command=exit_command, system_router_title=system_router_title, - ignore_command_register=ignore_command_register, dividing_line=dividing_line, repeat_command_groups_printing=repeat_command_groups_printing, override_system_messages=override_system_messages, @@ -481,7 +431,7 @@ class App(BaseApp): """ self._pre_cycle_setup() while True: - if self._repeat_command_groups_printing_description: + if self._repeat_command_groups_printing: self._print_command_group_description() raw_command: str = Console().input(self._prompt) @@ -497,13 +447,10 @@ class App(BaseApp): if self._is_exit_command(input_command): self.system_router.finds_appropriate_handler(input_command) - self._autocompleter.exit_setup( - list(self._current_matching_triggers_with_routers.keys()), - self._ignore_command_register, - ) + self._autocompleter.exit_setup(self.registered_routers.get_triggers()) return - if self._is_unknown_command(input_command): + if self.registered_routers.get_router_by_trigger(input_command.trigger.lower()): with redirect_stdout(io.StringIO()) as stdout: self._unknown_command_handler(input_command) stdout_res: str = stdout.getvalue() diff --git a/src/argenta/app/registered_routers/entity.py b/src/argenta/app/registered_routers/entity.py index 7ffa841..e670228 100644 --- a/src/argenta/app/registered_routers/entity.py +++ b/src/argenta/app/registered_routers/entity.py @@ -6,15 +6,14 @@ from argenta.router import Router class RegisteredRouters: - def __init__(self, registered_routers: list[Router] | None = None) -> None: + def __init__(self) -> None: """ Private. Combines registered routers :param registered_routers: list of the registered routers :return: None """ - self.registered_routers: list[Router] = registered_routers if registered_routers else [] - - self._matching_lower_triggers_with_routers + self.registered_routers: list[Router] = [] + self._paired_trigger_router: dict[str, Router] = {} def add_registered_router(self, router: Router, /) -> None: """ @@ -23,6 +22,14 @@ class RegisteredRouters: :return: None """ self.registered_routers.append(router) - + for trigger in (router.aliases | router.triggers): + self._paired_trigger_router[trigger] = router + + def get_router_by_trigger(self, trigger: str) -> Router | None: + return self._paired_trigger_router.get(trigger) + + def get_triggers(self) -> set[str]: + return set(self._paired_trigger_router.keys()) + def __iter__(self) -> Iterator[Router]: return iter(self.registered_routers) diff --git a/src/argenta/router/entity.py b/src/argenta/router/entity.py index 29df179..18b1134 100644 --- a/src/argenta/router/entity.py +++ b/src/argenta/router/entity.py @@ -104,7 +104,7 @@ class Router: command_handler = self.command_handlers.get_command_handler_by_trigger(input_command_name) if not command_handler: - raise RuntimeError(f"Handler for '{input_command.trigger}' command not found!") + raise RuntimeError(f"Handler for '{input_command.trigger}' command not found. Panic!") else: self.process_input_command(input_command_flags, command_handler) diff --git a/tests/unit_tests/test_autocompleter.py b/tests/unit_tests/test_autocompleter.py index 904c460..5b1c9c7 100644 --- a/tests/unit_tests/test_autocompleter.py +++ b/tests/unit_tests/test_autocompleter.py @@ -7,13 +7,12 @@ from pytest_mock import MockerFixture from argenta.app.autocompleter.entity import ( AutoCompleter, - _get_history_items, - _is_command_exist, + _get_history_items ) HISTORY_FILE: str = "test_history.txt" -COMMANDS: list[str] = ["start", "stop", "status"] +COMMANDS: set[str] = {"start", "stop", "status"} # ============================================================================ @@ -119,7 +118,7 @@ def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mo fs.create_file(HISTORY_FILE, contents=raw_history_content) # pyright: ignore[reportUnknownMemberType] completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE) - completer.exit_setup(all_commands=["start", "stop"], ignore_command_register=False) + completer.exit_setup(all_commands={"start", "stop"}) mock_readline.write_history_file.assert_called_once_with(HISTORY_FILE) @@ -131,7 +130,7 @@ def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mo def test_exit_setup_skips_writing_when_no_history_filename(mock_readline: Any) -> None: completer: AutoCompleter = AutoCompleter(history_filename=None) - completer.exit_setup(all_commands=COMMANDS, ignore_command_register=False) + completer.exit_setup(all_commands=COMMANDS) mock_readline.write_history_file.assert_not_called() @@ -182,22 +181,6 @@ def test_complete_inserts_common_prefix_for_multiple_matches(mock_readline: Any) # ============================================================================ -def test_is_command_exist_checks_case_sensitive_when_enabled() -> None: - existing: list[str] = ["start", "stop", "status"] - - assert _is_command_exist("start", existing, ignore_command_register=False) is True - assert _is_command_exist("START", existing, ignore_command_register=False) is False - assert _is_command_exist("unknown", existing, ignore_command_register=False) is False - - -def test_is_command_exist_checks_case_insensitive_when_enabled() -> None: - existing: list[str] = ["start", "stop", "status"] - - assert _is_command_exist("start", existing, ignore_command_register=True) is True - assert _is_command_exist("START", existing, ignore_command_register=True) is True - assert _is_command_exist("unknown", existing, ignore_command_register=True) is False - - def test_get_history_items_returns_empty_list_initially(mock_readline: Any) -> None: assert _get_history_items() == [] diff --git a/tests/unit_tests/test_router.py b/tests/unit_tests/test_router.py index 347338a..21bc6d3 100644 --- a/tests/unit_tests/test_router.py +++ b/tests/unit_tests/test_router.py @@ -208,7 +208,7 @@ def test_finds_appropriate_handler_executes_handler_by_alias(capsys: CaptureFixt assert "Hello World!" in output.out -def test_finds_appropriate_handler_executes_handler_by_alias_with_differrent_register(capsys: CaptureFixture[str]) -> None: +def test_finds_appropriate_handler_executes_handler_by_alias_case_insensitive(capsys: CaptureFixture[str]) -> None: router = Router() @router.command(Command('hello', aliases={'hI'})) @@ -222,7 +222,7 @@ def test_finds_appropriate_handler_executes_handler_by_alias_with_differrent_reg assert "Hello World!" in output.out -def test_finds_appropriate_handler_executes_handler_by_trigger_with_differrent_register(capsys: CaptureFixture[str]) -> None: +def test_finds_appropriate_handler_executes_handler_by_trigger_case_insensitive(capsys: CaptureFixture[str]) -> None: router = Router() @router.command(Command('heLLo')) @@ -305,3 +305,95 @@ def test_validate_command_raises_error_for_alias_collision_case_insensitive() -> @router.command(Command('world', aliases={'hI'})) def handler2(_res: Response) -> None: pass + + +# ============================================================================ +# Tests for RegisteredRouters +# ============================================================================ + + +def test_registered_routers_get_router_by_trigger() -> None: + from argenta.app.registered_routers.entity import RegisteredRouters + + registered_routers = RegisteredRouters() + router = Router() + + @router.command('hello') + def handler(_res: Response) -> None: + pass + + registered_routers.add_registered_router(router) + + assert registered_routers.get_router_by_trigger('hello') == router + + +def test_registered_routers_get_router_by_alias() -> None: + from argenta.app.registered_routers.entity import RegisteredRouters + + registered_routers = RegisteredRouters() + router = Router() + + @router.command(Command('hello', aliases={'hi'})) + def handler(_res: Response) -> None: + pass + + registered_routers.add_registered_router(router) + + assert registered_routers.get_router_by_trigger('hi') == router + + +def test_registered_routers_get_router_case_insensitive() -> None: + from argenta.app.registered_routers.entity import RegisteredRouters + + registered_routers = RegisteredRouters() + router = Router() + + @router.command(Command('HeLLo')) + def handler(_res: Response) -> None: + pass + + registered_routers.add_registered_router(router) + + # Trigger stored in lowercase, should match regardless of case + assert registered_routers.get_router_by_trigger('hello') == router + assert registered_routers.get_router_by_trigger('HELLO') is None # Exact match required in dict + + +def test_registered_routers_get_triggers_returns_all_triggers_and_aliases() -> None: + from argenta.app.registered_routers.entity import RegisteredRouters + + registered_routers = RegisteredRouters() + router1 = Router() + router2 = Router() + + @router1.command(Command('hello', aliases={'hi'})) + def handler1(_res: Response) -> None: + pass + + @router2.command(Command('world', aliases={'w'})) + def handler2(_res: Response) -> None: + pass + + registered_routers.add_registered_router(router1) + registered_routers.add_registered_router(router2) + + triggers = registered_routers.get_triggers() + assert 'hello' in triggers + assert 'hi' in triggers + assert 'world' in triggers + assert 'w' in triggers + + +def test_registered_routers_returns_none_for_unknown_trigger() -> None: + from argenta.app.registered_routers.entity import RegisteredRouters + + registered_routers = RegisteredRouters() + router = Router() + + @router.command('hello') + def handler(_res: Response) -> None: + pass + + registered_routers.add_registered_router(router) + + assert registered_routers.get_router_by_trigger('unknown') is None