mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 10:05:28 +03:00
better perf
This commit is contained in:
+9
-5
@@ -1,10 +1,14 @@
|
||||
from argenta import Command, Response, Router
|
||||
from argenta import Command, Response, Router, App, Orchestrator
|
||||
from argenta.command import InputCommand
|
||||
|
||||
router = Router()
|
||||
orchestrator = Orchestrator()
|
||||
|
||||
@router.command(Command('heLLo'))
|
||||
def handler(_res: Response) -> None:
|
||||
print("Hello World!")
|
||||
@router.command(Command('test'))
|
||||
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
|
||||
print('test command')
|
||||
|
||||
router.finds_appropriate_handler(InputCommand('HellO'))
|
||||
app = App(override_system_messages=True, print_func=print)
|
||||
app.include_router(router)
|
||||
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
|
||||
orchestrator.start_polling(app)
|
||||
@@ -48,7 +48,7 @@ class AutoCompleter:
|
||||
else:
|
||||
return None
|
||||
|
||||
def initial_setup(self, all_commands: list[str]) -> None:
|
||||
def initial_setup(self, all_commands: set[str]) -> None:
|
||||
"""
|
||||
Private. Initial setup function
|
||||
:param all_commands: Registered commands for adding them to the autocomplete history
|
||||
@@ -69,7 +69,7 @@ class AutoCompleter:
|
||||
readline.set_completer_delims(readline.get_completer_delims().replace(" ", ""))
|
||||
readline.parse_and_bind(f"{self.autocomplete_button}: complete")
|
||||
|
||||
def exit_setup(self, all_commands: list[str], ignore_command_register: bool) -> None:
|
||||
def exit_setup(self, all_commands: set[str]) -> None:
|
||||
"""
|
||||
Private. Exit setup function
|
||||
:return: None
|
||||
@@ -80,22 +80,12 @@ class AutoCompleter:
|
||||
raw_history = history_file.read()
|
||||
pretty_history: list[str] = []
|
||||
for line in set(raw_history.strip().split("\n")):
|
||||
if _is_command_exist(
|
||||
line.split()[0], all_commands, ignore_command_register
|
||||
):
|
||||
if line.split()[0] in all_commands:
|
||||
pretty_history.append(line)
|
||||
with open(self.history_filename, "w") as history_file:
|
||||
_ = history_file.write("\n".join(pretty_history))
|
||||
|
||||
|
||||
def _is_command_exist(
|
||||
command: str, existing_commands: list[str], ignore_command_register: bool
|
||||
) -> bool:
|
||||
if ignore_command_register:
|
||||
return command.lower() in existing_commands
|
||||
return command in existing_commands
|
||||
|
||||
|
||||
def _get_history_items() -> list[str] | list[Never]:
|
||||
"""
|
||||
Private. Returns a list of all commands entered by the user
|
||||
|
||||
+11
-64
@@ -40,7 +40,6 @@ class BaseApp:
|
||||
farewell_message: str,
|
||||
exit_command: Command,
|
||||
system_router_title: str,
|
||||
ignore_command_register: bool,
|
||||
dividing_line: StaticDividingLine | DynamicDividingLine,
|
||||
repeat_command_groups_printing: bool,
|
||||
override_system_messages: bool,
|
||||
@@ -51,8 +50,7 @@ class BaseApp:
|
||||
self._print_func: Printer = print_func
|
||||
self._exit_command: Command = exit_command
|
||||
self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line
|
||||
self._ignore_command_register: bool = ignore_command_register
|
||||
self._repeat_command_groups_printing_description: bool = repeat_command_groups_printing
|
||||
self._repeat_command_groups_printing: bool = repeat_command_groups_printing
|
||||
self._override_system_messages: bool = override_system_messages
|
||||
self._autocompleter: AutoCompleter = autocompleter
|
||||
self.system_router: Router = Router(title=system_router_title)
|
||||
@@ -66,15 +64,6 @@ class BaseApp:
|
||||
self.registered_routers: RegisteredRouters = RegisteredRouters()
|
||||
self._messages_on_startup: list[str] = []
|
||||
|
||||
self._matching_lower_triggers_with_routers: dict[str, Router] = {}
|
||||
self._matching_default_triggers_with_routers: dict[str, Router] = {}
|
||||
|
||||
self._current_matching_triggers_with_routers: dict[str, Router] = (
|
||||
self._matching_lower_triggers_with_routers
|
||||
if self._ignore_command_register
|
||||
else self._matching_default_triggers_with_routers
|
||||
)
|
||||
|
||||
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = (
|
||||
lambda _: print_func(f"Incorrect flag syntax: {_}")
|
||||
)
|
||||
@@ -217,37 +206,12 @@ class BaseApp:
|
||||
"""
|
||||
trigger = command.trigger
|
||||
exit_trigger = self._exit_command.trigger
|
||||
if self._ignore_command_register:
|
||||
if trigger.lower() == exit_trigger.lower():
|
||||
return True
|
||||
elif trigger.lower() in [x.lower() for x in self._exit_command.aliases]:
|
||||
return True
|
||||
else:
|
||||
if trigger == exit_trigger:
|
||||
return True
|
||||
elif trigger in self._exit_command.aliases:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_unknown_command(self, command: InputCommand) -> bool:
|
||||
"""
|
||||
Private. Checks if the given command is an unknown command
|
||||
:param command: command to check
|
||||
:return: is it an unknown command or not as bool
|
||||
"""
|
||||
input_command_trigger = command.trigger
|
||||
if self._ignore_command_register:
|
||||
if input_command_trigger.lower() in list(
|
||||
self._current_matching_triggers_with_routers.keys()
|
||||
):
|
||||
return False
|
||||
else:
|
||||
if input_command_trigger in list(
|
||||
self._current_matching_triggers_with_routers.keys()
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _error_handler(self, error: InputCommandException, raw_command: str) -> None:
|
||||
"""
|
||||
Private. Handles parsing errors of the entered command
|
||||
@@ -297,7 +261,7 @@ class BaseApp:
|
||||
all_aliases.update(router_entity.aliases)
|
||||
|
||||
def _most_similar_command(self, unknown_command: str) -> str | None:
|
||||
all_commands = list(self._current_matching_triggers_with_routers.keys())
|
||||
all_commands = self.registered_routers.get_triggers()
|
||||
|
||||
matches_startswith_unknown_command: Matches = sorted(
|
||||
cmd for cmd in all_commands if cmd.startswith(unknown_command)
|
||||
@@ -368,20 +332,7 @@ class BaseApp:
|
||||
self._setup_system_router()
|
||||
self._validate_routers_for_collisions()
|
||||
|
||||
for router_entity in self.registered_routers:
|
||||
router_triggers = router_entity.triggers
|
||||
router_aliases = router_entity.aliases
|
||||
combined = router_triggers | router_aliases
|
||||
|
||||
for trigger in combined:
|
||||
self._matching_default_triggers_with_routers[trigger] = router_entity
|
||||
self._matching_lower_triggers_with_routers[trigger.lower()] = (
|
||||
router_entity
|
||||
)
|
||||
|
||||
self._autocompleter.initial_setup(
|
||||
list(self._current_matching_triggers_with_routers.keys())
|
||||
)
|
||||
self._autocompleter.initial_setup(self.registered_routers.get_triggers())
|
||||
|
||||
if not self._override_system_messages:
|
||||
self._setup_default_view()
|
||||
@@ -392,13 +343,14 @@ class BaseApp:
|
||||
self._print_func(message)
|
||||
if self._messages_on_startup:
|
||||
print("\n")
|
||||
if not self._repeat_command_groups_printing_description:
|
||||
if not self._repeat_command_groups_printing:
|
||||
self._print_command_group_description()
|
||||
|
||||
def _process_exist_and_valid_command(self, input_command: InputCommand) -> None:
|
||||
processing_router = self._current_matching_triggers_with_routers[
|
||||
input_command.trigger.lower()
|
||||
]
|
||||
processing_router = self.registered_routers.get_router_by_trigger(input_command.trigger.lower())
|
||||
|
||||
if not processing_router:
|
||||
raise RuntimeError(f"Router for '{input_command.trigger}' not found. Panic!")
|
||||
|
||||
if processing_router.disable_redirect_stdout:
|
||||
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
|
||||
@@ -452,7 +404,6 @@ class App(BaseApp):
|
||||
:param farewell_message: displayed at the end of the app
|
||||
:param exit_command: the entity of the command that will be terminated when entered
|
||||
:param system_router_title: system router title
|
||||
:param ignore_command_register: whether to ignore the case of the entered commands
|
||||
:param dividing_line: the entity of the dividing line
|
||||
:param repeat_command_groups_printing: whether to repeat the available commands and their description
|
||||
:param override_system_messages: whether to redefine the default formatting of system messages
|
||||
@@ -466,7 +417,6 @@ class App(BaseApp):
|
||||
farewell_message=farewell_message,
|
||||
exit_command=exit_command,
|
||||
system_router_title=system_router_title,
|
||||
ignore_command_register=ignore_command_register,
|
||||
dividing_line=dividing_line,
|
||||
repeat_command_groups_printing=repeat_command_groups_printing,
|
||||
override_system_messages=override_system_messages,
|
||||
@@ -481,7 +431,7 @@ class App(BaseApp):
|
||||
"""
|
||||
self._pre_cycle_setup()
|
||||
while True:
|
||||
if self._repeat_command_groups_printing_description:
|
||||
if self._repeat_command_groups_printing:
|
||||
self._print_command_group_description()
|
||||
|
||||
raw_command: str = Console().input(self._prompt)
|
||||
@@ -497,13 +447,10 @@ class App(BaseApp):
|
||||
|
||||
if self._is_exit_command(input_command):
|
||||
self.system_router.finds_appropriate_handler(input_command)
|
||||
self._autocompleter.exit_setup(
|
||||
list(self._current_matching_triggers_with_routers.keys()),
|
||||
self._ignore_command_register,
|
||||
)
|
||||
self._autocompleter.exit_setup(self.registered_routers.get_triggers())
|
||||
return
|
||||
|
||||
if self._is_unknown_command(input_command):
|
||||
if self.registered_routers.get_router_by_trigger(input_command.trigger.lower()):
|
||||
with redirect_stdout(io.StringIO()) as stdout:
|
||||
self._unknown_command_handler(input_command)
|
||||
stdout_res: str = stdout.getvalue()
|
||||
|
||||
@@ -6,15 +6,14 @@ from argenta.router import Router
|
||||
|
||||
|
||||
class RegisteredRouters:
|
||||
def __init__(self, registered_routers: list[Router] | None = None) -> None:
|
||||
def __init__(self) -> None:
|
||||
"""
|
||||
Private. Combines registered routers
|
||||
:param registered_routers: list of the registered routers
|
||||
:return: None
|
||||
"""
|
||||
self.registered_routers: list[Router] = registered_routers if registered_routers else []
|
||||
|
||||
self._matching_lower_triggers_with_routers
|
||||
self.registered_routers: list[Router] = []
|
||||
self._paired_trigger_router: dict[str, Router] = {}
|
||||
|
||||
def add_registered_router(self, router: Router, /) -> None:
|
||||
"""
|
||||
@@ -23,6 +22,14 @@ class RegisteredRouters:
|
||||
:return: None
|
||||
"""
|
||||
self.registered_routers.append(router)
|
||||
for trigger in (router.aliases | router.triggers):
|
||||
self._paired_trigger_router[trigger] = router
|
||||
|
||||
def get_router_by_trigger(self, trigger: str) -> Router | None:
|
||||
return self._paired_trigger_router.get(trigger)
|
||||
|
||||
def get_triggers(self) -> set[str]:
|
||||
return set(self._paired_trigger_router.keys())
|
||||
|
||||
def __iter__(self) -> Iterator[Router]:
|
||||
return iter(self.registered_routers)
|
||||
|
||||
@@ -104,7 +104,7 @@ class Router:
|
||||
command_handler = self.command_handlers.get_command_handler_by_trigger(input_command_name)
|
||||
|
||||
if not command_handler:
|
||||
raise RuntimeError(f"Handler for '{input_command.trigger}' command not found!")
|
||||
raise RuntimeError(f"Handler for '{input_command.trigger}' command not found. Panic!")
|
||||
else:
|
||||
self.process_input_command(input_command_flags, command_handler)
|
||||
|
||||
|
||||
@@ -7,13 +7,12 @@ from pytest_mock import MockerFixture
|
||||
|
||||
from argenta.app.autocompleter.entity import (
|
||||
AutoCompleter,
|
||||
_get_history_items,
|
||||
_is_command_exist,
|
||||
_get_history_items
|
||||
)
|
||||
|
||||
|
||||
HISTORY_FILE: str = "test_history.txt"
|
||||
COMMANDS: list[str] = ["start", "stop", "status"]
|
||||
COMMANDS: set[str] = {"start", "stop", "status"}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -119,7 +118,7 @@ def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mo
|
||||
fs.create_file(HISTORY_FILE, contents=raw_history_content) # pyright: ignore[reportUnknownMemberType]
|
||||
|
||||
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
|
||||
completer.exit_setup(all_commands=["start", "stop"], ignore_command_register=False)
|
||||
completer.exit_setup(all_commands={"start", "stop"})
|
||||
|
||||
mock_readline.write_history_file.assert_called_once_with(HISTORY_FILE)
|
||||
|
||||
@@ -131,7 +130,7 @@ def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mo
|
||||
|
||||
def test_exit_setup_skips_writing_when_no_history_filename(mock_readline: Any) -> None:
|
||||
completer: AutoCompleter = AutoCompleter(history_filename=None)
|
||||
completer.exit_setup(all_commands=COMMANDS, ignore_command_register=False)
|
||||
completer.exit_setup(all_commands=COMMANDS)
|
||||
mock_readline.write_history_file.assert_not_called()
|
||||
|
||||
|
||||
@@ -182,22 +181,6 @@ def test_complete_inserts_common_prefix_for_multiple_matches(mock_readline: Any)
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_is_command_exist_checks_case_sensitive_when_enabled() -> None:
|
||||
existing: list[str] = ["start", "stop", "status"]
|
||||
|
||||
assert _is_command_exist("start", existing, ignore_command_register=False) is True
|
||||
assert _is_command_exist("START", existing, ignore_command_register=False) is False
|
||||
assert _is_command_exist("unknown", existing, ignore_command_register=False) is False
|
||||
|
||||
|
||||
def test_is_command_exist_checks_case_insensitive_when_enabled() -> None:
|
||||
existing: list[str] = ["start", "stop", "status"]
|
||||
|
||||
assert _is_command_exist("start", existing, ignore_command_register=True) is True
|
||||
assert _is_command_exist("START", existing, ignore_command_register=True) is True
|
||||
assert _is_command_exist("unknown", existing, ignore_command_register=True) is False
|
||||
|
||||
|
||||
def test_get_history_items_returns_empty_list_initially(mock_readline: Any) -> None:
|
||||
assert _get_history_items() == []
|
||||
|
||||
|
||||
@@ -208,7 +208,7 @@ def test_finds_appropriate_handler_executes_handler_by_alias(capsys: CaptureFixt
|
||||
|
||||
assert "Hello World!" in output.out
|
||||
|
||||
def test_finds_appropriate_handler_executes_handler_by_alias_with_differrent_register(capsys: CaptureFixture[str]) -> None:
|
||||
def test_finds_appropriate_handler_executes_handler_by_alias_case_insensitive(capsys: CaptureFixture[str]) -> None:
|
||||
router = Router()
|
||||
|
||||
@router.command(Command('hello', aliases={'hI'}))
|
||||
@@ -222,7 +222,7 @@ def test_finds_appropriate_handler_executes_handler_by_alias_with_differrent_reg
|
||||
assert "Hello World!" in output.out
|
||||
|
||||
|
||||
def test_finds_appropriate_handler_executes_handler_by_trigger_with_differrent_register(capsys: CaptureFixture[str]) -> None:
|
||||
def test_finds_appropriate_handler_executes_handler_by_trigger_case_insensitive(capsys: CaptureFixture[str]) -> None:
|
||||
router = Router()
|
||||
|
||||
@router.command(Command('heLLo'))
|
||||
@@ -305,3 +305,95 @@ def test_validate_command_raises_error_for_alias_collision_case_insensitive() ->
|
||||
@router.command(Command('world', aliases={'hI'}))
|
||||
def handler2(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Tests for RegisteredRouters
|
||||
# ============================================================================
|
||||
|
||||
|
||||
def test_registered_routers_get_router_by_trigger() -> None:
|
||||
from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
|
||||
registered_routers = RegisteredRouters()
|
||||
router = Router()
|
||||
|
||||
@router.command('hello')
|
||||
def handler(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
registered_routers.add_registered_router(router)
|
||||
|
||||
assert registered_routers.get_router_by_trigger('hello') == router
|
||||
|
||||
|
||||
def test_registered_routers_get_router_by_alias() -> None:
|
||||
from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
|
||||
registered_routers = RegisteredRouters()
|
||||
router = Router()
|
||||
|
||||
@router.command(Command('hello', aliases={'hi'}))
|
||||
def handler(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
registered_routers.add_registered_router(router)
|
||||
|
||||
assert registered_routers.get_router_by_trigger('hi') == router
|
||||
|
||||
|
||||
def test_registered_routers_get_router_case_insensitive() -> None:
|
||||
from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
|
||||
registered_routers = RegisteredRouters()
|
||||
router = Router()
|
||||
|
||||
@router.command(Command('HeLLo'))
|
||||
def handler(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
registered_routers.add_registered_router(router)
|
||||
|
||||
# Trigger stored in lowercase, should match regardless of case
|
||||
assert registered_routers.get_router_by_trigger('hello') == router
|
||||
assert registered_routers.get_router_by_trigger('HELLO') is None # Exact match required in dict
|
||||
|
||||
|
||||
def test_registered_routers_get_triggers_returns_all_triggers_and_aliases() -> None:
|
||||
from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
|
||||
registered_routers = RegisteredRouters()
|
||||
router1 = Router()
|
||||
router2 = Router()
|
||||
|
||||
@router1.command(Command('hello', aliases={'hi'}))
|
||||
def handler1(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
@router2.command(Command('world', aliases={'w'}))
|
||||
def handler2(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
registered_routers.add_registered_router(router1)
|
||||
registered_routers.add_registered_router(router2)
|
||||
|
||||
triggers = registered_routers.get_triggers()
|
||||
assert 'hello' in triggers
|
||||
assert 'hi' in triggers
|
||||
assert 'world' in triggers
|
||||
assert 'w' in triggers
|
||||
|
||||
|
||||
def test_registered_routers_returns_none_for_unknown_trigger() -> None:
|
||||
from argenta.app.registered_routers.entity import RegisteredRouters
|
||||
|
||||
registered_routers = RegisteredRouters()
|
||||
router = Router()
|
||||
|
||||
@router.command('hello')
|
||||
def handler(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
registered_routers.add_registered_router(router)
|
||||
|
||||
assert registered_routers.get_router_by_trigger('unknown') is None
|
||||
|
||||
Reference in New Issue
Block a user