mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 10:05:28 +03:00
perf
This commit is contained in:
@@ -6,7 +6,9 @@ from typing import Never
|
||||
|
||||
|
||||
class AutoCompleter:
|
||||
def __init__(self, history_filename: str | None = None, autocomplete_button: str = "tab") -> None:
|
||||
def __init__(
|
||||
self, history_filename: str | None = None, autocomplete_button: str = "tab"
|
||||
) -> None:
|
||||
"""
|
||||
Public. Configures and implements auto-completion of input command
|
||||
:param history_filename: the name of the file for saving the history of the autocompleter
|
||||
@@ -23,12 +25,18 @@ class AutoCompleter:
|
||||
:param state: the current cursor position is relative to the beginning of the line
|
||||
:return: the desired candidate as str or None
|
||||
"""
|
||||
matches: list[str] = sorted(cmd for cmd in _get_history_items() if cmd.startswith(text))
|
||||
matches: list[str] = sorted(
|
||||
cmd for cmd in _get_history_items() if cmd.startswith(text)
|
||||
)
|
||||
if len(matches) > 1:
|
||||
common_prefix = matches[0]
|
||||
for match in matches[1:]:
|
||||
i = 0
|
||||
while i < len(common_prefix) and i < len(match) and common_prefix[i] == match[i]:
|
||||
while (
|
||||
i < len(common_prefix)
|
||||
and i < len(match)
|
||||
and common_prefix[i] == match[i]
|
||||
):
|
||||
i += 1
|
||||
common_prefix = common_prefix[:i]
|
||||
if state == 0:
|
||||
@@ -72,13 +80,17 @@ class AutoCompleter:
|
||||
raw_history = history_file.read()
|
||||
pretty_history: list[str] = []
|
||||
for line in set(raw_history.strip().split("\n")):
|
||||
if _is_command_exist(line.split()[0], all_commands, ignore_command_register):
|
||||
if _is_command_exist(
|
||||
line.split()[0], all_commands, ignore_command_register
|
||||
):
|
||||
pretty_history.append(line)
|
||||
with open(self.history_filename, "w") as history_file:
|
||||
_ = history_file.write("\n".join(pretty_history))
|
||||
|
||||
|
||||
def _is_command_exist(command: str, existing_commands: list[str], ignore_command_register: bool) -> bool:
|
||||
def _is_command_exist(
|
||||
command: str, existing_commands: list[str], ignore_command_register: bool
|
||||
) -> bool:
|
||||
if ignore_command_register:
|
||||
return command.lower() in existing_commands
|
||||
return command in existing_commands
|
||||
@@ -89,4 +101,7 @@ def _get_history_items() -> list[str] | list[Never]:
|
||||
Private. Returns a list of all commands entered by the user
|
||||
:return: all commands entered by the user as list[str] | list[Never]
|
||||
"""
|
||||
return [readline.get_history_item(i) for i in range(1, readline.get_current_history_length() + 1)]
|
||||
return [
|
||||
readline.get_history_item(i)
|
||||
for i in range(1, readline.get_current_history_length() + 1)
|
||||
]
|
||||
|
||||
@@ -272,7 +272,6 @@ class BaseApp:
|
||||
def _(response: Response) -> None:
|
||||
self._exit_command_handler(response)
|
||||
|
||||
self.system_router.command_register_ignore = self._ignore_command_register
|
||||
self.registered_routers.add_registered_router(self.system_router)
|
||||
|
||||
def _validate_routers_for_collisions(self) -> None:
|
||||
@@ -519,7 +518,6 @@ class App(BaseApp):
|
||||
:param router: registered router
|
||||
:return: None
|
||||
"""
|
||||
router.command_register_ignore = self._ignore_command_register
|
||||
self.registered_routers.add_registered_router(router)
|
||||
|
||||
def include_routers(self, *routers: Router) -> None:
|
||||
|
||||
@@ -2,7 +2,6 @@ __all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "Desc
|
||||
|
||||
from typing import Protocol, TypeVar
|
||||
|
||||
|
||||
T = TypeVar("T", contravariant=True) # noqa: WPS111
|
||||
|
||||
|
||||
|
||||
@@ -1,18 +1,20 @@
|
||||
__all__ = ["RegisteredRouters"]
|
||||
|
||||
from typing import Iterator, Optional
|
||||
from typing import Iterator
|
||||
|
||||
from argenta.router import Router
|
||||
|
||||
|
||||
class RegisteredRouters:
|
||||
def __init__(self, registered_routers: Optional[list[Router]] = None) -> None:
|
||||
def __init__(self, registered_routers: list[Router] | None = None) -> None:
|
||||
"""
|
||||
Private. Combines registered routers
|
||||
:param registered_routers: list of the registered routers
|
||||
:return: None
|
||||
"""
|
||||
self.registered_routers: list[Router] = registered_routers if registered_routers else []
|
||||
|
||||
self._matching_lower_triggers_with_routers
|
||||
|
||||
def add_registered_router(self, router: Router, /) -> None:
|
||||
"""
|
||||
|
||||
@@ -4,7 +4,6 @@ from enum import Enum
|
||||
from re import Pattern
|
||||
from typing import Literal, override
|
||||
|
||||
|
||||
PREFIX_TYPE = Literal["-", "--", "---"]
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
__all__ = ["Command", "InputCommand"]
|
||||
|
||||
import shlex
|
||||
from typing import Never, Self, cast, Literal
|
||||
from typing import Literal, Never, Self, cast
|
||||
|
||||
from argenta.command.exceptions import (
|
||||
EmptyInputCommandException,
|
||||
@@ -38,30 +38,38 @@ class Command:
|
||||
:param flags: processed commands
|
||||
:param aliases: string synonyms for the main trigger
|
||||
"""
|
||||
self.registered_flags: Flags = flags if isinstance(flags, Flags) else Flags([flags])
|
||||
pretty_flags = flags if isinstance(flags, Flags) else Flags([flags])
|
||||
self.registered_flags: Flags = pretty_flags
|
||||
self.trigger: str = trigger
|
||||
self.description: str = description
|
||||
self.aliases: set[str] | set[Never] = aliases
|
||||
|
||||
self._paired_string_entity_flag: dict[str, Flag] = {
|
||||
flag.string_entity: flag for flag in pretty_flags
|
||||
}
|
||||
|
||||
def validate_input_flag(self, flag: InputFlag) -> ValidationStatus:
|
||||
"""
|
||||
Private. Validates the input flag
|
||||
:param flag: input flag for validation
|
||||
:return: is input flag valid as bool
|
||||
"""
|
||||
registered_flags: Flags = self.registered_flags
|
||||
for registered_flag in registered_flags:
|
||||
if registered_flag.string_entity == flag.string_entity:
|
||||
is_valid = registered_flag.validate_input_flag_value(flag.input_value)
|
||||
if is_valid:
|
||||
return ValidationStatus.VALID
|
||||
else:
|
||||
return ValidationStatus.INVALID
|
||||
if registered_flag := self._paired_string_entity_flag.get(flag.string_entity):
|
||||
is_valid = registered_flag.validate_input_flag_value(flag.input_value)
|
||||
if is_valid:
|
||||
return ValidationStatus.VALID
|
||||
else:
|
||||
return ValidationStatus.INVALID
|
||||
return ValidationStatus.UNDEFINED
|
||||
|
||||
|
||||
class InputCommand:
|
||||
def __init__(self, trigger: str, *, input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS):
|
||||
def __init__(
|
||||
self,
|
||||
trigger: str,
|
||||
*,
|
||||
input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS,
|
||||
):
|
||||
"""
|
||||
Private. The model of the input command, after parsing
|
||||
:param trigger:the trigger of the command
|
||||
@@ -70,7 +78,9 @@ class InputCommand:
|
||||
"""
|
||||
self.trigger: str = trigger
|
||||
self.input_flags: InputFlags = (
|
||||
input_flags if isinstance(input_flags, InputFlags) else InputFlags([input_flags])
|
||||
input_flags
|
||||
if isinstance(input_flags, InputFlags)
|
||||
else InputFlags([input_flags])
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@@ -81,17 +91,17 @@ class InputCommand:
|
||||
:return: model of the input command, after parsing as InputCommand
|
||||
"""
|
||||
tokens = shlex.split(raw_command)
|
||||
|
||||
|
||||
if not tokens:
|
||||
raise EmptyInputCommandException
|
||||
|
||||
|
||||
command = tokens[0]
|
||||
flags: InputFlags = InputFlags()
|
||||
|
||||
|
||||
i = 1
|
||||
while i < len(tokens):
|
||||
token = tokens[i]
|
||||
|
||||
|
||||
if token.startswith("---"):
|
||||
prefix = "---"
|
||||
name = token[3:]
|
||||
@@ -103,24 +113,24 @@ class InputCommand:
|
||||
name = token[1:]
|
||||
else:
|
||||
raise UnprocessedInputFlagException
|
||||
|
||||
|
||||
if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
|
||||
input_value = tokens[i + 1]
|
||||
i += 2
|
||||
else:
|
||||
input_value = ""
|
||||
i += 1
|
||||
|
||||
|
||||
input_flag = InputFlag(
|
||||
name=name,
|
||||
prefix=cast(PREFIX_TYPE, prefix), # pyright: ignore[reportUnnecessaryCast]
|
||||
input_value=input_value,
|
||||
status=None
|
||||
status=None,
|
||||
)
|
||||
|
||||
|
||||
if input_flag in flags:
|
||||
raise RepeatedInputFlagsException(input_flag)
|
||||
|
||||
|
||||
flags.add_flag(input_flag)
|
||||
|
||||
|
||||
return cls(command, input_flags=flags)
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
from argenta.metrics.main import get_time_of_pre_cycle_setup as get_time_of_pre_cycle_setup
|
||||
from argenta.metrics.main import \
|
||||
get_time_of_pre_cycle_setup as get_time_of_pre_cycle_setup
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from argenta.orchestrator.argparser.arguments.models import BooleanArgument as BooleanArgument
|
||||
from argenta.orchestrator.argparser.arguments.models import \
|
||||
BooleanArgument as BooleanArgument
|
||||
from argenta.orchestrator.argparser.arguments.models import InputArgument as InputArgument
|
||||
from argenta.orchestrator.argparser.arguments.models import ValueArgument as ValueArgument
|
||||
|
||||
@@ -7,12 +7,9 @@ import sys
|
||||
from argparse import ArgumentParser, Namespace
|
||||
from typing import Never, Self
|
||||
|
||||
from argenta.orchestrator.argparser.arguments.models import (
|
||||
BaseArgument,
|
||||
BooleanArgument,
|
||||
InputArgument,
|
||||
ValueArgument,
|
||||
)
|
||||
from argenta.orchestrator.argparser.arguments.models import (BaseArgument,
|
||||
BooleanArgument,
|
||||
InputArgument, ValueArgument)
|
||||
|
||||
|
||||
class ArgSpace:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
__all__ = ["ResponseStatus"]
|
||||
|
||||
from enum import Enum
|
||||
from typing import Self
|
||||
|
||||
|
||||
class ResponseStatus(Enum):
|
||||
@@ -11,7 +10,7 @@ class ResponseStatus(Enum):
|
||||
UNDEFINED_AND_INVALID_FLAGS = "UNDEFINED_AND_INVALID_FLAGS"
|
||||
|
||||
@classmethod
|
||||
def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> Self:
|
||||
def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> "ResponseStatus":
|
||||
key = (has_invalid_value_flags, has_undefined_flags)
|
||||
status_map: dict[tuple[bool, bool], ResponseStatus] = {
|
||||
(True, True): cls.UNDEFINED_AND_INVALID_FLAGS,
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
__all__ = ["CommandHandler", "CommandHandlers"]
|
||||
|
||||
from collections.abc import Iterator
|
||||
from typing import Callable
|
||||
from typing import Callable, Never
|
||||
|
||||
from argenta.command import Command
|
||||
from argenta.response import Response
|
||||
|
||||
|
||||
HandlerFunc = Callable[..., None]
|
||||
|
||||
|
||||
@@ -30,7 +29,7 @@ class CommandHandler:
|
||||
|
||||
|
||||
class CommandHandlers:
|
||||
def __init__(self, command_handlers: tuple[CommandHandler] = tuple()):
|
||||
def __init__(self, command_handlers: tuple[CommandHandler] | tuple[Never, ...] = tuple()):
|
||||
"""
|
||||
Private. The model that unites all CommandHandler of the routers
|
||||
:param command_handlers: list of CommandHandlers for register
|
||||
@@ -49,7 +48,7 @@ class CommandHandlers:
|
||||
for alias in command_handler.handled_command.aliases:
|
||||
self.paired_command_handler_trigger[alias.lower()] = command_handler
|
||||
|
||||
def get_command_handler_by_trigger(self, trigger: str):
|
||||
def get_command_handler_by_trigger(self, trigger: str) -> CommandHandler | None:
|
||||
print(self.paired_command_handler_trigger)
|
||||
return self.paired_command_handler_trigger.get(trigger)
|
||||
|
||||
|
||||
@@ -7,16 +7,14 @@ from rich.console import Console
|
||||
|
||||
from argenta.command import Command, InputCommand
|
||||
from argenta.command.flag import ValidationStatus
|
||||
from argenta.command.flag.flags import Flags, InputFlags
|
||||
from argenta.command.flag.flags import InputFlags
|
||||
from argenta.response import Response, ResponseStatus
|
||||
from argenta.router.command_handler.entity import CommandHandler, CommandHandlers
|
||||
from argenta.router.exceptions import (
|
||||
RepeatedAliasNameException,
|
||||
RepeatedFlagNameException,
|
||||
RepeatedTriggerNameException,
|
||||
RequiredArgumentNotPassedException,
|
||||
TriggerContainSpacesException,
|
||||
)
|
||||
from argenta.router.exceptions import (RepeatedAliasNameException,
|
||||
RepeatedFlagNameException,
|
||||
RepeatedTriggerNameException,
|
||||
RequiredArgumentNotPassedException,
|
||||
TriggerContainSpacesException)
|
||||
|
||||
HandlerFunc: TypeAlias = Callable[..., None]
|
||||
|
||||
@@ -42,8 +40,6 @@ class Router:
|
||||
self.disable_redirect_stdout: bool = disable_redirect_stdout
|
||||
|
||||
self.command_handlers: CommandHandlers = CommandHandlers()
|
||||
self.command_register_ignore: bool = False
|
||||
|
||||
self.aliases: set[str] = set()
|
||||
self.triggers: set[str] = set()
|
||||
|
||||
@@ -87,12 +83,11 @@ class Router:
|
||||
if overlapping := (self.aliases | self.triggers) & set(map(lambda x: x.lower(), command.aliases)):
|
||||
raise RepeatedAliasNameException(overlapping)
|
||||
|
||||
flags: Flags = command.registered_flags
|
||||
flags_name: list[str] = [flag.string_entity.lower() for flag in flags]
|
||||
flags_name: list[str] = [flag.string_entity.lower() for flag in command.registered_flags]
|
||||
if len(set(flags_name)) < len(flags_name):
|
||||
raise RepeatedFlagNameException()
|
||||
|
||||
def _update_routing_keys(self, registered_command: Command):
|
||||
def _update_routing_keys(self, registered_command: Command) -> None:
|
||||
redefined_command_aliases_in_lower = set(map(lambda x: x.lower(), registered_command.aliases))
|
||||
self.aliases.update(redefined_command_aliases_in_lower)
|
||||
self.triggers.add(registered_command.trigger.lower())
|
||||
|
||||
Reference in New Issue
Block a user