diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml new file mode 100644 index 0000000..a79b4c5 --- /dev/null +++ b/.github/workflows/mypy.yml @@ -0,0 +1,30 @@ +name: mypy + +on: + push: + branches: [ "kolo" ] + pull_request: + branches: [ "kolo" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v3 + with: + python-version: "3.13" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install mypy + + - name: Run type checker + run: mypy -p argenta --strict diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5b5cdbe..13a8c4e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,8 +24,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install poetry - poetry install + pip install uv + uv sync --group dev - name: Run tests - run: poetry run python -m unittest discover + run: uv run python -m unittest discover diff --git a/.gitignore b/.gitignore index c263dbe..07c7336 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ uv.lock *.hist* build source +*cache diff --git a/metrics_tests/get_time_of_pre_cycle_setup.py b/metrics_tests/get_time_of_pre_cycle_setup.py deleted file mode 100644 index 439093c..0000000 --- a/metrics_tests/get_time_of_pre_cycle_setup.py +++ /dev/null @@ -1,36 +0,0 @@ -from argenta.command import Command -from argenta.metrics import get_time_of_pre_cycle_setup -from argenta.response import Response -from argenta.router import Router -from argenta.app import App - - - -class TimeOfPreCycleSetup: - @staticmethod - def commands_with_two_aliases(num_of_commands: int): - router = Router() - - for i in range(num_of_commands): - @router.command(Command(f'cmd{i}', aliases=[f'cdr{i}', f'prt{i}'])) - def handler(response: Response): - pass - - app = App() - app.include_router(router) - - return get_time_of_pre_cycle_setup(app) - - @staticmethod - def commands_with_one_aliases(num_of_commands: int): - router = Router() - - for i in range(num_of_commands): - @router.command(Command(f'cmd{i}', aliases=[f'cdr{i}'])) - def handler(response: Response): - pass - - app = App() - app.include_router(router) - - return get_time_of_pre_cycle_setup(app) diff --git a/metrics_tests/time_of_precycle_setup.py b/metrics_tests/time_of_precycle_setup.py new file mode 100644 index 0000000..a750432 --- /dev/null +++ b/metrics_tests/time_of_precycle_setup.py @@ -0,0 +1,33 @@ +from argenta.command import Command +from argenta.metrics import get_time_of_pre_cycle_setup +from argenta.response import Response +from argenta.router import Router +from argenta.app import App + + + +def commands_with_two_aliases(num_of_commands: int): + router = Router() + + for i in range(num_of_commands): + @router.command(Command(f'cmd{i}', aliases=[f'cdr{i}', f'prt{i}'])) + def handler(response: Response): # pyright: ignore[reportUnusedFunction, reportUnusedParameter] + pass + + app = App() + app.include_router(router) + + return get_time_of_pre_cycle_setup(app) + +def commands_with_one_aliases(num_of_commands: int): + router = Router() + + for i in range(num_of_commands): + @router.command(Command(f'cmd{i}', aliases=[f'cdr{i}'])) + def handler(response: Response): # pyright: ignore[reportUnusedFunction, reportUnusedParameter] + pass + + app = App() + app.include_router(router) + + return get_time_of_pre_cycle_setup(app) diff --git a/mock/local_test.py b/mock/local_test.py index 95fcef8..e672a5a 100644 --- a/mock/local_test.py +++ b/mock/local_test.py @@ -1,18 +1,5 @@ -from argenta.app import App -from argenta.command import Command -from argenta.orchestrator import Orchestrator -from argenta.router import Router +import argparse - -router = Router() -orchestrator = Orchestrator() - -@router.command(Command('test')) -def test(response): - print('test command') - -app = App(ignore_command_register=True, - override_system_messages=True, - print_func=print) -app.include_router(router) -orchestrator.start_polling(app) \ No newline at end of file +parser = argparse.ArgumentParser(prog='myprogram') +_ = parser.add_argument('--foo', help='foo of the %(prog)s program') +parser.print_help() \ No newline at end of file diff --git a/mock/mock_app/main.py b/mock/mock_app/main.py index e51ca8c..67cc55f 100644 --- a/mock/mock_app/main.py +++ b/mock/mock_app/main.py @@ -1,25 +1,22 @@ from mock.mock_app.routers import work_router -from argenta.app import App -from argenta.app.defaults import PredefinedMessages -from argenta.app.dividing_line import DynamicDividingLine -from argenta.app.autocompleter import AutoCompleter -from argenta.orchestrator import Orchestrator -from argenta.orchestrator.argparser import ArgParser -from argenta.orchestrator.argparser.arguments import BooleanArgument +from argenta import App, Orchestrator +from argenta.app import PredefinedMessages, DynamicDividingLine, AutoCompleter +from argenta.orchestrator import ArgParser +from argenta.orchestrator.argparser import BooleanArgument arg_parser = ArgParser(processed_args=[BooleanArgument("repeat")]) app: App = App( dividing_line=DynamicDividingLine(), autocompleter=AutoCompleter(), - repeat_command_groups=False, ) orchestrator: Orchestrator = Orchestrator(arg_parser) def main(): app.include_router(work_router) + print(f"\n\n{orchestrator.get_input_args()}") app.add_message_on_startup(PredefinedMessages.USAGE) app.add_message_on_startup(PredefinedMessages.AUTOCOMPLETE) diff --git a/mock/mock_app/routers.py b/mock/mock_app/routers.py index 14e72ef..030fe51 100644 --- a/mock/mock_app/routers.py +++ b/mock/mock_app/routers.py @@ -1,39 +1,25 @@ -from rich.console import Console - -from argenta.command import Command -from argenta.command.flag.defaults import PredefinedFlags -from argenta.command.flag import Flags, Flag, PossibleValues +from argenta.command import Command, PredefinedFlags, Flags, Flag, PossibleValues from argenta.response import Response -from argenta.router import Router +from argenta import Router -work_router: Router = Router(title="Work points:", disable_redirect_stdout=True) +work_router: Router = Router(title="Work points:") -console = Console() - -flag = Flag('csdv', possible_values=PossibleValues.DISABLE) +flag = Flag('csdv', possible_values=PossibleValues.NEITHER) @work_router.command( - Command( - "get", - "Get Help", + Command("get", + description="Get Help", aliases=["help", "Get_help"], - flags=Flags(PredefinedFlags.PORT, PredefinedFlags.HOST), - ) -) + flags=Flags([PredefinedFlags.PORT, + PredefinedFlags.HOST]))) def command_help(response: Response): - case = input("test > ") - print(case) print(response.status) - print(response.undefined_flags.get_flags()) - print(response.valid_flags.get_flags()) - print(response.invalid_value_flags.get_flags()) + print(response.input_flags.flags) @work_router.command("run") def command_start_solving(response: Response): print(response.status) - print(response.undefined_flags.get_flags()) - print(response.valid_flags.get_flags()) - print(response.invalid_value_flags.get_flags()) + print(response.input_flags.flags) diff --git a/pyproject.toml b/pyproject.toml index 2064854..702b66d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "argenta" -version = "1.0.7" +version = "1.1.1rc0" description = "Python library for building modular CLI applications" authors = [{ name = "kolo", email = "kolo.is.main@gmail.com" }] requires-python = ">=3.8" @@ -28,6 +28,9 @@ build-backend = "hatchling.build" [dependency-groups] dev = [ - "psutil>=7.0.0", + "mypy>=1.14.1", + "pytest>=8.3.2", + "ruff>=0.12.12", + "wemake-python-styleguide>=0.17.0", ] diff --git a/src/argenta/__init__.py b/src/argenta/__init__.py index e69de29..6498ca5 100644 --- a/src/argenta/__init__.py +++ b/src/argenta/__init__.py @@ -0,0 +1,10 @@ +__all__ = [ + 'App', + 'Orchestrator', + 'Router', +] + + +from argenta.app import App +from argenta.orchestrator import Orchestrator +from argenta.router import Router diff --git a/src/argenta/app/__init__.py b/src/argenta/app/__init__.py index b2876f6..7803dd8 100644 --- a/src/argenta/app/__init__.py +++ b/src/argenta/app/__init__.py @@ -1,3 +1,12 @@ -__all__ = ["App"] +__all__ = [ + "App", + "PredefinedMessages", + "DynamicDividingLine", + "StaticDividingLine", + "AutoCompleter" +] from argenta.app.models import App +from argenta.app.defaults import PredefinedMessages +from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine +from argenta.app.autocompleter.entity import AutoCompleter diff --git a/src/argenta/app/autocompleter/entity.py b/src/argenta/app/autocompleter/entity.py index 185d4cc..e90de55 100644 --- a/src/argenta/app/autocompleter/entity.py +++ b/src/argenta/app/autocompleter/entity.py @@ -13,10 +13,10 @@ class AutoCompleter: :param autocomplete_button: the button for auto-completion :return: None """ - self.history_filename = history_filename - self.autocomplete_button = autocomplete_button + self.history_filename: str | None = history_filename + self.autocomplete_button: str = autocomplete_button - def _complete(self, text, state) -> str | None: + def _complete(self, text: str, state: int) -> str | None: """ Private. Auto-completion function :param text: part of the command being entered @@ -24,7 +24,7 @@ class AutoCompleter: :return: the desired candidate as str or None """ matches: list[str] = sorted( - cmd for cmd in self.get_history_items() if cmd.startswith(text) + cmd for cmd in _get_history_items() if cmd.startswith(text) ) if len(matches) > 1: common_prefix = matches[0] @@ -38,7 +38,7 @@ class AutoCompleter: i += 1 common_prefix = common_prefix[:i] if state == 0: - readline.insert_text(common_prefix[len(text) :]) + readline.insert_text(common_prefix[len(text) :]) readline.redisplay() return None elif len(matches) == 1: @@ -54,10 +54,10 @@ class AutoCompleter: """ if self.history_filename: if os.path.exists(self.history_filename): - readline.read_history_file(self.history_filename) + readline.read_history_file(self.history_filename) else: for line in all_commands: - readline.add_history(line) + readline.add_history(line) readline.set_completer(self._complete) readline.set_completer_delims(readline.get_completer_delims().replace(" ", "")) @@ -69,7 +69,7 @@ class AutoCompleter: :return: None """ if self.history_filename: - readline.write_history_file(self.history_filename) + readline.write_history_file(self.history_filename) with open(self.history_filename, "r") as history_file: raw_history = history_file.read() pretty_history: list[str] = [] @@ -77,15 +77,14 @@ class AutoCompleter: if line.split()[0] in all_commands: pretty_history.append(line) with open(self.history_filename, "w") as history_file: - history_file.write("\n".join(pretty_history)) + _ = history_file.write("\n".join(pretty_history)) - @staticmethod - def get_history_items() -> list[str] | list[Never]: - """ - Private. Returns a list of all commands entered by the user - :return: all commands entered by the user as list[str] | list[Never] - """ - return [ - readline.get_history_item(i) - for i in range(1, readline.get_current_history_length() + 1) - ] +def _get_history_items() -> list[str] | list[Never]: + """ + Private. Returns a list of all commands entered by the user + :return: all commands entered by the user as list[str] | list[Never] + """ + return [ + readline.get_history_item(i) + for i in range(1, readline.get_current_history_length() + 1) + ] diff --git a/src/argenta/app/defaults.py b/src/argenta/app/defaults.py index 6e819de..2908cef 100644 --- a/src/argenta/app/defaults.py +++ b/src/argenta/app/defaults.py @@ -5,7 +5,6 @@ class PredefinedMessages(StrEnum): """ Public. A dataclass with predetermined messages for quick use """ - USAGE = "[b dim]Usage[/b dim]: [i] <[green]flags[/green]>[/i]" HELP = "[b dim]Help[/b dim]: [i][/i] [b red]--help[/b red]" AUTOCOMPLETE = "[b dim]Autocomplete[/b dim]: [i][/i] [bold]" diff --git a/src/argenta/app/dividing_line/models.py b/src/argenta/app/dividing_line/models.py index 2afeb2e..d54279e 100644 --- a/src/argenta/app/dividing_line/models.py +++ b/src/argenta/app/dividing_line/models.py @@ -8,7 +8,7 @@ class BaseDividingLine(ABC): :param unit_part: the single part of the dividing line :return: None """ - self._unit_part = unit_part + self._unit_part: str = unit_part def get_unit_part(self) -> str: """ @@ -22,7 +22,7 @@ class BaseDividingLine(ABC): class StaticDividingLine(BaseDividingLine): - def __init__(self, unit_part: str = "-", length: int = 25) -> None: + def __init__(self, unit_part: str = "-", *, length: int = 25) -> None: """ Public. The static dividing line :param unit_part: the single part of the dividing line @@ -30,9 +30,9 @@ class StaticDividingLine(BaseDividingLine): :return: None """ super().__init__(unit_part) - self.length = length + self.length: int = length - def get_full_static_line(self, is_override: bool) -> str: + def get_full_static_line(self, *, is_override: bool) -> str: """ Private. Returns the full line of the dividing line :param is_override: has the default text layout been redefined @@ -53,7 +53,7 @@ class DynamicDividingLine(BaseDividingLine): """ super().__init__(unit_part) - def get_full_dynamic_line(self, length: int, is_override: bool) -> str: + def get_full_dynamic_line(self, *, length: int, is_override: bool) -> str: """ Private. Returns the full line of the dividing line :param length: the length of the dividing line diff --git a/src/argenta/app/models.py b/src/argenta/app/models.py index 65920fb..6c7da55 100644 --- a/src/argenta/app/models.py +++ b/src/argenta/app/models.py @@ -1,29 +1,37 @@ -from typing import Callable -from rich.console import Console -from rich.markup import escape -from art import text2art -from contextlib import redirect_stdout import io import re +from contextlib import redirect_stdout +from typing import Never, TypeAlias + +from art import text2art # pyright: ignore[reportMissingTypeStubs, reportUnknownVariableType] +from rich.console import Console +from rich.markup import escape -from argenta.command.models import Command, InputCommand -from argenta.router import Router -from argenta.router.defaults import system_router from argenta.app.autocompleter import AutoCompleter -from argenta.app.dividing_line.models import StaticDividingLine, DynamicDividingLine -from argenta.command.exceptions import ( - UnprocessedInputFlagException, - RepeatedInputFlagsException, - EmptyInputCommandException, - BaseInputCommandException, +from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine +from argenta.app.protocols import ( + DescriptionMessageGenerator, + EmptyCommandHandler, + NonStandardBehaviorHandler, + Printer, ) from argenta.app.registered_routers.entity import RegisteredRouters +from argenta.command.exceptions import ( + EmptyInputCommandException, + InputCommandException, + RepeatedInputFlagsException, + UnprocessedInputFlagException, +) +from argenta.command.models import Command, InputCommand from argenta.response import Response +from argenta.router import Router +from argenta.router.defaults import system_router +Matches: TypeAlias = list[str] | list[Never] class BaseApp: - def __init__(self, prompt: str, + def __init__(self, *, prompt: str, initial_message: str, farewell_message: str, exit_command: Command, @@ -33,57 +41,44 @@ class BaseApp: repeat_command_groups: bool, override_system_messages: bool, autocompleter: AutoCompleter, - print_func: Callable[[str], None]) -> None: - self._prompt = prompt - self._print_func = print_func - self._exit_command = exit_command - self._system_router_title = system_router_title - self._dividing_line = dividing_line - self._ignore_command_register = ignore_command_register - self._repeat_command_groups_description = repeat_command_groups - self._override_system_messages = override_system_messages - self._autocompleter = autocompleter + print_func: Printer) -> None: + self._prompt: str = prompt + self._print_func: Printer = print_func + self._exit_command: Command = exit_command + self._system_router_title: str | None = system_router_title + self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line + self._ignore_command_register: bool = ignore_command_register + self._repeat_command_groups_description: bool = repeat_command_groups + self._override_system_messages: bool = override_system_messages + self._autocompleter: AutoCompleter = autocompleter - self._farewell_message = farewell_message - self._initial_message = initial_message + self._farewell_message: str = farewell_message + self._initial_message: str = initial_message - self._description_message_gen: Callable[[str, str], str] = lambda command, description: f"{command} *=*=* {description}" + self._description_message_gen: DescriptionMessageGenerator = lambda command, description: f"{command} *=*=* {description}" self._registered_routers: RegisteredRouters = RegisteredRouters() self._messages_on_startup: list[str] = [] self._matching_lower_triggers_with_routers: dict[str, Router] = {} self._matching_default_triggers_with_routers: dict[str, Router] = {} - if self._ignore_command_register: - self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_lower_triggers_with_routers - else: - self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_default_triggers_with_routers + self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_lower_triggers_with_routers if self._ignore_command_register else self._matching_default_triggers_with_routers - self._incorrect_input_syntax_handler: Callable[[str], None] = ( - lambda raw_command: print_func(f"Incorrect flag syntax: {raw_command}") - ) - self._repeated_input_flags_handler: Callable[[str], None] = ( - lambda raw_command: print_func(f"Repeated input flags: {raw_command}") - ) - self._empty_input_command_handler: Callable[[], None] = lambda: print_func( - "Empty input command" - ) - self._unknown_command_handler: Callable[[InputCommand], None] = ( - lambda command: print_func(f"Unknown command: {command.get_trigger()}") - ) - self._exit_command_handler: Callable[[Response], None] = ( - lambda response: print_func(self._farewell_message) - ) + self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Incorrect flag syntax: {_}") + self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Repeated input flags: {_}") + self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func("Empty input command") + self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = lambda _: print_func(f"Unknown command: {_.trigger}") + self._exit_command_handler: NonStandardBehaviorHandler[Response] = lambda _: print_func(self._farewell_message) - def set_description_message_pattern(self, _: Callable[[str, str], str]) -> None: + def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None: """ Public. Sets the output pattern of the available commands :param _: output pattern of the available commands :return: None """ - self._description_message_gen: Callable[[str, str], str] = _ + self._description_message_gen = _ - def set_incorrect_input_syntax_handler(self, _: Callable[[str], None]) -> None: + def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None: """ Public. Sets the handler for incorrect flags when entering a command :param _: handler for incorrect flags when entering a command @@ -91,7 +86,7 @@ class BaseApp: """ self._incorrect_input_syntax_handler = _ - def set_repeated_input_flags_handler(self, _: Callable[[str], None]) -> None: + def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None: """ Public. Sets the handler for repeated flags when entering a command :param _: handler for repeated flags when entering a command @@ -99,7 +94,7 @@ class BaseApp: """ self._repeated_input_flags_handler = _ - def set_unknown_command_handler(self, _: Callable[[str], None]) -> None: + def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None: """ Public. Sets the handler for unknown commands when entering a command :param _: handler for unknown commands when entering a command @@ -107,7 +102,7 @@ class BaseApp: """ self._unknown_command_handler = _ - def set_empty_command_handler(self, _: Callable[[], None]) -> None: + def set_empty_command_handler(self, _: EmptyCommandHandler, /) -> None: """ Public. Sets the handler for empty commands when entering a command :param _: handler for empty commands when entering a command @@ -115,7 +110,7 @@ class BaseApp: """ self._empty_input_command_handler = _ - def set_exit_command_handler(self, _: Callable[[], None]) -> None: + def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None: """ Public. Sets the handler for exit command when entering a command :param _: handler for exit command when entering a command @@ -131,11 +126,12 @@ class BaseApp: for registered_router in self._registered_routers: if registered_router.title: self._print_func(registered_router.title) - for command_handler in registered_router.get_command_handlers(): + for command_handler in registered_router.command_handlers: + handled_command = command_handler.handled_command self._print_func( self._description_message_gen( - command_handler.get_handled_command().get_trigger(), - command_handler.get_handled_command().get_description(), + handled_command.trigger, + handled_command.description, ) ) self._print_func("") @@ -146,16 +142,7 @@ class BaseApp: :param text: framed text :return: None """ - if isinstance(self._dividing_line, StaticDividingLine): - self._print_func( - self._dividing_line.get_full_static_line(self._override_system_messages) - ) - print(text.strip("\n")) - self._print_func( - self._dividing_line.get_full_static_line(self._override_system_messages) - ) - - elif isinstance(self._dividing_line, DynamicDividingLine): + if isinstance(self._dividing_line, DynamicDividingLine): clear_text = re.sub(r"\u001b\[[0-9;]*m", "", text) max_length_line = max([len(line) for line in clear_text.split("\n")]) max_length_line = ( @@ -168,15 +155,27 @@ class BaseApp: self._print_func( self._dividing_line.get_full_dynamic_line( - max_length_line, self._override_system_messages + length=max_length_line, is_override=self._override_system_messages ) ) print(text.strip("\n")) self._print_func( self._dividing_line.get_full_dynamic_line( - max_length_line, self._override_system_messages + length=max_length_line, is_override=self._override_system_messages ) ) + + elif isinstance(self._dividing_line, StaticDividingLine): # pyright: ignore[reportUnnecessaryIsInstance] + self._print_func( + self._dividing_line.get_full_static_line(is_override=self._override_system_messages) + ) + print(text.strip("\n")) + self._print_func( + self._dividing_line.get_full_static_line(is_override=self._override_system_messages) + ) + + else: + raise NotImplementedError def _is_exit_command(self, command: InputCommand) -> bool: """ @@ -184,20 +183,21 @@ class BaseApp: :param command: command to check :return: is it an exit command or not as bool """ + trigger = command.trigger + exit_trigger = self._exit_command.trigger if self._ignore_command_register: if ( - command.get_trigger().lower() - == self._exit_command.get_trigger().lower() + trigger.lower() == exit_trigger.lower() ): return True - elif command.get_trigger().lower() in [ - x.lower() for x in self._exit_command.get_aliases() + elif trigger.lower() in [ + x.lower() for x in self._exit_command.aliases ]: return True else: - if command.get_trigger() == self._exit_command.get_trigger(): + if trigger == exit_trigger: return True - elif command.get_trigger() in self._exit_command.get_aliases(): + elif trigger in self._exit_command.aliases: return True return False @@ -207,7 +207,7 @@ class BaseApp: :param command: command to check :return: is it an unknown command or not as bool """ - input_command_trigger = command.get_trigger() + input_command_trigger = command.trigger if self._ignore_command_register: if input_command_trigger.lower() in list(self._current_matching_triggers_with_routers.keys()): return False @@ -217,7 +217,7 @@ class BaseApp: return True def _error_handler( - self, error: BaseInputCommandException, raw_command: str + self, error: InputCommandException, raw_command: str ) -> None: """ Private. Handles parsing errors of the entered command @@ -240,23 +240,25 @@ class BaseApp: system_router.title = self._system_router_title @system_router.command(self._exit_command) - def exit_command(response: Response) -> None: + def _(response: Response) -> None: self._exit_command_handler(response) - if system_router not in self._registered_routers.get_registered_routers(): - system_router.set_command_register_ignore(self._ignore_command_register) + if system_router not in self._registered_routers.registered_routers: + system_router.command_register_ignore = self._ignore_command_register self._registered_routers.add_registered_router(system_router) def _most_similar_command(self, unknown_command: str) -> str | None: all_commands = list(self._current_matching_triggers_with_routers.keys()) - - matches: list[str] | list = sorted( + + matches_startswith_unknown_command: Matches = sorted( cmd for cmd in all_commands if cmd.startswith(unknown_command) ) - if not matches: - matches: list[str] | list = sorted( - cmd for cmd in all_commands if unknown_command.startswith(cmd) - ) + matches_startswith_cmd: Matches = sorted( + cmd for cmd in all_commands if unknown_command.startswith(cmd) + ) + + matches: Matches = matches_startswith_unknown_command or matches_startswith_cmd + if len(matches) == 1: return matches[0] elif len(matches) > 1: @@ -272,9 +274,9 @@ class BaseApp: self._prompt = f"[italic dim bold]{self._prompt}" self._initial_message = ("\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n") self._farewell_message = ( - "[bold red]\n\n" - + text2art(self._farewell_message, font="chanky") - + "\n[/bold red]\n" + "[bold red]\n\n" + + str(text2art(self._farewell_message, font="chanky")) + # pyright: ignore[reportUnknownArgumentType] + "\n[/bold red]\n" + "[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n" ) self._description_message_gen = lambda command, description: ( @@ -287,7 +289,7 @@ class BaseApp: self._empty_input_command_handler = lambda: self._print_func("[red bold]Empty input command") def unknown_command_handler(command: InputCommand) -> None: - cmd_trg: str = command.get_trigger() + cmd_trg: str = command.trigger mst_sim_cmd: str | None = self._most_similar_command(cmd_trg) first_part_of_text = f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]" second_part_of_text = ( @@ -299,7 +301,7 @@ class BaseApp: self._unknown_command_handler = unknown_command_handler - def pre_cycle_setup(self) -> None: + def _pre_cycle_setup(self) -> None: """ Private. Configures various aspects of the application before the start of the cycle :return: None @@ -307,8 +309,8 @@ class BaseApp: self._setup_system_router() for router_entity in self._registered_routers: - router_triggers = router_entity.get_triggers() - router_aliases = router_entity.get_aliases() + router_triggers = router_entity.triggers + router_aliases = router_entity.aliases combined = router_triggers + router_aliases for trigger in combined: @@ -337,20 +339,28 @@ class BaseApp: self._print_command_group_description() +AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine +DEFAULT_DIVIDING_LINE: StaticDividingLine = StaticDividingLine() + +DEFAULT_PRINT_FUNC: Printer = Console().print +DEFAULT_AUTOCOMPLETER: AutoCompleter = AutoCompleter() +DEFAULT_EXIT_COMMAND: Command = Command("Q", description="Exit command") + + class App(BaseApp): def __init__( - self, - prompt: str = "What do you want to do?\n", + self, *, + prompt: str = "What do you want to do?\n\n", initial_message: str = "Argenta\n", farewell_message: str = "\nSee you\n", - exit_command: Command = Command("Q", "Exit command"), + exit_command: Command = DEFAULT_EXIT_COMMAND, system_router_title: str | None = "System points:", ignore_command_register: bool = True, - dividing_line: StaticDividingLine | DynamicDividingLine = StaticDividingLine(), + dividing_line: AVAILABLE_DIVIDING_LINES = DEFAULT_DIVIDING_LINE, repeat_command_groups: bool = True, override_system_messages: bool = False, - autocompleter: AutoCompleter = AutoCompleter(), - print_func: Callable[[str], None] = Console().print, + autocompleter: AutoCompleter = DEFAULT_AUTOCOMPLETER, + print_func: Printer = DEFAULT_PRINT_FUNC, ) -> None: """ Public. The essence of the application itself. @@ -387,7 +397,7 @@ class App(BaseApp): Private. Starts the user input processing cycle :return: None """ - self.pre_cycle_setup() + self._pre_cycle_setup() while True: if self._repeat_command_groups_description: self._print_command_group_description() @@ -396,11 +406,11 @@ class App(BaseApp): try: input_command: InputCommand = InputCommand.parse(raw_command=raw_command) - except BaseInputCommandException as error: - with redirect_stdout(io.StringIO()) as f: + except InputCommandException as error: + with redirect_stdout(io.StringIO()) as stderr: self._error_handler(error, raw_command) - res: str = f.getvalue() - self._print_framed_text(res) + stderr_result: str = stderr.getvalue() + self._print_framed_text(stderr_result) continue if self._is_exit_command(input_command): @@ -409,29 +419,30 @@ class App(BaseApp): return if self._is_unknown_command(input_command): - with redirect_stdout(io.StringIO()) as f: + with redirect_stdout(io.StringIO()) as stdout: self._unknown_command_handler(input_command) - res: str = f.getvalue() - self._print_framed_text(res) + stdout_res: str = stdout.getvalue() + self._print_framed_text(stdout_res) continue - processing_router = self._current_matching_triggers_with_routers[input_command.get_trigger().lower()] + processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()] if processing_router.disable_redirect_stdout: if isinstance(self._dividing_line, StaticDividingLine): - self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages)) + self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages)) processing_router.finds_appropriate_handler(input_command) - self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages)) + self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages)) else: - self._print_func(StaticDividingLine(self._dividing_line.get_unit_part()).get_full_static_line(self._override_system_messages)) + dividing_line_unit_part: str = self._dividing_line.get_unit_part() + self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages)) processing_router.finds_appropriate_handler(input_command) - self._print_func(StaticDividingLine(self._dividing_line.get_unit_part()).get_full_static_line(self._override_system_messages)) + self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages)) else: - with redirect_stdout(io.StringIO()) as f: + with redirect_stdout(io.StringIO()) as stdout: processing_router.finds_appropriate_handler(input_command) - res: str = f.getvalue() - if res: - self._print_framed_text(res) + stdout_result: str = stdout.getvalue() + if stdout_result: + self._print_framed_text(stdout_result) def include_router(self, router: Router) -> None: """ @@ -439,7 +450,7 @@ class App(BaseApp): :param router: registered router :return: None """ - router.set_command_register_ignore(self._ignore_command_register) + router.command_register_ignore = self._ignore_command_register self._registered_routers.add_registered_router(router) def include_routers(self, *routers: Router) -> None: diff --git a/src/argenta/app/protocols.py b/src/argenta/app/protocols.py new file mode 100644 index 0000000..e3f13fd --- /dev/null +++ b/src/argenta/app/protocols.py @@ -0,0 +1,22 @@ +from typing import Protocol, TypeVar + +T = TypeVar('T', contravariant=True) # noqa: WPS111 + + +class NonStandardBehaviorHandler(Protocol[T]): + def __call__(self, __param: T) -> None: + raise NotImplementedError + +class EmptyCommandHandler(Protocol): + def __call__(self) -> None: + raise NotImplementedError + + +class Printer(Protocol): + def __call__(self, __text: str) -> None: + raise NotImplementedError + + +class DescriptionMessageGenerator(Protocol): + def __call__(self, __first_param: str, __second_param: str) -> str: + raise NotImplementedError diff --git a/src/argenta/app/registered_routers/entity.py b/src/argenta/app/registered_routers/entity.py index 2070e2a..bce0a23 100644 --- a/src/argenta/app/registered_routers/entity.py +++ b/src/argenta/app/registered_routers/entity.py @@ -1,34 +1,27 @@ -from typing import Iterator +from typing import Iterator, Optional from argenta.router import Router class RegisteredRouters: - def __init__(self, registered_routers: list[Router] | None = None) -> None: + def __init__(self, registered_routers: Optional[list[Router]] = None) -> None: """ Private. Combines registered routers :param registered_routers: list of the registered routers :return: None """ - self._registered_routers = registered_routers if registered_routers else [] + self.registered_routers: list[Router] = registered_routers if registered_routers else [] - def get_registered_routers(self) -> list[Router]: - """ - Private. Returns the registered routers - :return: registered routers as list[Router] - """ - return self._registered_routers - - def add_registered_router(self, router: Router) -> None: + def add_registered_router(self, router: Router, /) -> None: """ Private. Adds a new registered router :param router: registered router :return: None """ - self._registered_routers.append(router) + self.registered_routers.append(router) def __iter__(self) -> Iterator[Router]: - return iter(self._registered_routers) + return iter(self.registered_routers) def __next__(self) -> Router: - return next(iter(self._registered_routers)) + return next(iter(self.registered_routers)) diff --git a/src/argenta/command/__init__.py b/src/argenta/command/__init__.py index 6531596..98310e5 100644 --- a/src/argenta/command/__init__.py +++ b/src/argenta/command/__init__.py @@ -1,3 +1,12 @@ -__all__ = ["Command"] +__all__ = [ + "Command", + "PossibleValues", + "PredefinedFlags", + "InputCommand", + "Flags", + "Flag" +] -from argenta.command.models import Command +from argenta.command.models import Command, InputCommand +from argenta.command.flag import defaults as PredefinedFlags +from argenta.command.flag import (Flag, Flags, PossibleValues) diff --git a/src/argenta/command/exceptions.py b/src/argenta/command/exceptions.py index aa75383..8e5a2ad 100644 --- a/src/argenta/command/exceptions.py +++ b/src/argenta/command/exceptions.py @@ -1,42 +1,49 @@ from argenta.command.flag.models import Flag, InputFlag +from abc import ABC, abstractmethod +from typing import override -class BaseInputCommandException(Exception): +class InputCommandException(ABC, Exception): """ Private. Base exception class for all exceptions raised when parse input command """ - - pass + @override + @abstractmethod + def __str__(self) -> str: + raise NotImplementedError -class UnprocessedInputFlagException(BaseInputCommandException): +class UnprocessedInputFlagException(InputCommandException): """ Private. Raised when an unprocessed input flag is detected """ - - def __str__(self): + @override + def __str__(self) -> str: return "Unprocessed Input Flags" -class RepeatedInputFlagsException(BaseInputCommandException): +class RepeatedInputFlagsException(InputCommandException): """ Private. Raised when repeated input flags are detected """ def __init__(self, flag: Flag | InputFlag): - self.flag = flag + self.flag: Flag | InputFlag = flag + super().__init__() - def __str__(self): + @override + def __str__(self) -> str: + string_entity: str = self.flag.string_entity return ( "Repeated Input Flags\n" - f"Duplicate flag was detected in the input: '{self.flag.get_string_entity()}'" + f"Duplicate flag was detected in the input: '{string_entity}'" ) -class EmptyInputCommandException(BaseInputCommandException): +class EmptyInputCommandException(InputCommandException): """ Private. Raised when an empty input command is detected """ - - def __str__(self): + @override + def __str__(self) -> str: return "Input Command is empty" diff --git a/src/argenta/command/flag/__init__.py b/src/argenta/command/flag/__init__.py index c714f5d..3095bd8 100644 --- a/src/argenta/command/flag/__init__.py +++ b/src/argenta/command/flag/__init__.py @@ -1,17 +1,11 @@ __all__ = [ "Flag", "InputFlag", - "UndefinedInputFlags", - "ValidInputFlags", - "InvalidValueInputFlags", - "Flags", "PossibleValues" + "Flags", + "PossibleValues", + "ValidationStatus" ] -from argenta.command.flag.models import Flag, InputFlag, PossibleValues -from argenta.command.flag.flags.models import ( - UndefinedInputFlags, - ValidInputFlags, - Flags, - InvalidValueInputFlags, -) +from argenta.command.flag.models import Flag, InputFlag, PossibleValues, ValidationStatus +from argenta.command.flag.flags.models import Flags diff --git a/src/argenta/command/flag/defaults.py b/src/argenta/command/flag/defaults.py index 391a7bd..2ffb780 100644 --- a/src/argenta/command/flag/defaults.py +++ b/src/argenta/command/flag/defaults.py @@ -1,32 +1,27 @@ -from dataclasses import dataclass +from typing import Literal from argenta.command.flag.models import Flag, PossibleValues -import re +import re +DEFAULT_PREFIX: Literal["-", "--", "---"] = "-" -@dataclass -class PredefinedFlags: - """ - Public. A dataclass with predefined flags and most frequently used flags for quick use - """ +HELP = Flag(name="help", possible_values=PossibleValues.NEITHER) +SHORT_HELP = Flag(name="H", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER) - HELP = Flag(name="help", possible_values=PossibleValues.DISABLE) - SHORT_HELP = Flag(name="H", prefix="-", possible_values=PossibleValues.DISABLE) +INFO = Flag(name="info", possible_values=PossibleValues.NEITHER) # noqa: WPS110 +SHORT_INFO = Flag(name="I", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER) - INFO = Flag(name="info", possible_values=PossibleValues.DISABLE) - SHORT_INFO = Flag(name="I", prefix="-", possible_values=PossibleValues.DISABLE) +ALL = Flag(name="all", possible_values=PossibleValues.NEITHER) +SHORT_ALL = Flag(name="A", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER) - ALL = Flag(name="all", possible_values=PossibleValues.DISABLE) - SHORT_ALL = Flag(name="A", prefix="-", possible_values=PossibleValues.DISABLE) +HOST = Flag( + name="host", possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") +) +SHORT_HOST = Flag( + name="H", + prefix=DEFAULT_PREFIX, + possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), +) - HOST = Flag( - name="host", possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") - ) - SHORT_HOST = Flag( - name="H", - prefix="-", - possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), - ) - - PORT = Flag(name="port", possible_values=re.compile(r"^\d{1,5}$")) - SHORT_PORT = Flag(name="P", prefix="-", possible_values=re.compile(r"^\d{1,5}$")) +PORT = Flag(name="port", possible_values=re.compile(r"^\d{1,5}$")) +SHORT_PORT = Flag(name="P", prefix=DEFAULT_PREFIX, possible_values=re.compile(r"^\d{1,5}$")) diff --git a/src/argenta/command/flag/flags/__init__.py b/src/argenta/command/flag/flags/__init__.py index c0d94ed..b355115 100644 --- a/src/argenta/command/flag/flags/__init__.py +++ b/src/argenta/command/flag/flags/__init__.py @@ -1,16 +1,10 @@ __all__ = [ "Flags", - "InputFlags", - "UndefinedInputFlags", - "InvalidValueInputFlags", - "ValidInputFlags", + "InputFlags" ] from argenta.command.flag.flags.models import ( Flags, - InputFlags, - UndefinedInputFlags, - InvalidValueInputFlags, - ValidInputFlags, + InputFlags ) diff --git a/src/argenta/command/flag/flags/models.py b/src/argenta/command/flag/flags/models.py index a340429..c80b21b 100644 --- a/src/argenta/command/flag/flags/models.py +++ b/src/argenta/command/flag/flags/models.py @@ -1,90 +1,106 @@ from argenta.command.flag.models import InputFlag, Flag -from typing import Generic, TypeVar +from typing import Generic, TypeVar, override +from collections.abc import Iterator FlagType = TypeVar("FlagType") class BaseFlags(Generic[FlagType]): - def __init__(self, *flags: FlagType): + def __init__(self, flags: list[FlagType] | None = None) -> None: """ Public. A model that combines the registered flags :param flags: the flags that will be registered :return: None """ - self._flags = flags if flags else [] + self.flags: list[FlagType] = flags if flags else [] - def get_flags(self) -> list[FlagType]: - """ - Public. Returns a list of flags - :return: list of flags as list[FlagType] - """ - return self._flags - - def add_flag(self, flag: FlagType): + def add_flag(self, flag: FlagType) -> None: """ Public. Adds a flag to the list of flags :param flag: flag to add :return: None """ - self._flags.append(flag) + self.flags.append(flag) - def add_flags(self, flags: list[FlagType]): + def add_flags(self, flags: list[FlagType]) -> None: """ Public. Adds a list of flags to the list of flags :param flags: list of flags to add :return: None """ - self._flags.extend(flags) + self.flags.extend(flags) - def get_flag(self, name: str) -> FlagType | None: + def __iter__(self) -> Iterator[FlagType]: + return iter(self.flags) + + def __next__(self) -> FlagType: + return next(iter(self)) + + def __getitem__(self, flag_index: int) -> FlagType: + return self.flags[flag_index] + + def __bool__(self) -> bool: + return bool(self.flags) + + +class Flags(BaseFlags[Flag]): + def get_flag_by_name(self, name: str) -> Flag | None: """ Public. Returns the flag entity by its name or None if not found :param name: the name of the flag to get :return: entity of the flag or None """ - if name in [flag.get_name() for flag in self._flags]: - return list(filter(lambda flag: flag.get_name() == name, self._flags))[0] - else: - return None + return next((flag for flag in self.flags if flag.name == name), None) + + @override + def __eq__(self, other: object) -> bool: + if not isinstance(other, Flags): + return NotImplemented - def __iter__(self): - return iter(self._flags) + if len(self.flags) != len(other.flags): + return False - def __next__(self): - return next(iter(self)) + flag_pairs: zip[tuple[Flag, Flag]] = zip(self.flags, other.flags) + return all(s_flag == o_flag for s_flag, o_flag in flag_pairs) - def __getitem__(self, item): - return self._flags[item] - - def __bool__(self): - return bool(self._flags) - - def __eq__(self, other): - if len(self.get_flags()) != len(other.get_flags()): + def __contains__(self, flag_to_check: object) -> bool: + if isinstance(flag_to_check, Flag): + for flag in self.flags: + if flag == flag_to_check: + return True return False else: - for flag, other_flag in zip(self.get_flags(), other.get_flags()): - if not flag == other_flag: - return False - return True - - -class Flags(BaseFlags[Flag]): - pass + raise TypeError class InputFlags(BaseFlags[InputFlag]): - pass + def get_flag_by_name(self, name: str) -> InputFlag | None: + """ + Public. Returns the flag entity by its name or None if not found + :param name: the name of the flag to get + :return: entity of the flag or None + """ + return next((flag for flag in self.flags if flag.name == name), None) + + @override + def __eq__(self, other: object) -> bool: + if not isinstance(other, InputFlags): + raise NotImplementedError + if len(self.flags) != len(other.flags): + return False -class ValidInputFlags(InputFlags): - pass + paired_flags: zip[tuple[InputFlag, InputFlag]] = zip(self.flags, other.flags) + return all(my_flag == other_flag for my_flag, other_flag in paired_flags) -class UndefinedInputFlags(InputFlags): - pass + def __contains__(self, ingressable_item: object) -> bool: + if isinstance(ingressable_item, InputFlag): + for flag in self.flags: + if flag == ingressable_item: + return True + return False + else: + raise TypeError - -class InvalidValueInputFlags(InputFlags): - pass diff --git a/src/argenta/command/flag/models.py b/src/argenta/command/flag/models.py index 3f2ffb4..7f04992 100644 --- a/src/argenta/command/flag/models.py +++ b/src/argenta/command/flag/models.py @@ -1,57 +1,22 @@ from enum import Enum -from typing import Literal, Pattern - +from re import Pattern +from typing import Literal, override class PossibleValues(Enum): - DISABLE: Literal[False] = False - ALL: Literal[True] = True - - def __eq__(self, other: bool) -> bool: - return self.value == other + NEITHER = 'NEITHER' + ALL = 'ALL' -class BaseFlag: - def __init__(self, name: str, prefix: Literal["-", "--", "---"] = "--") -> None: - """ - Private. Base class for flags - :param name: the name of the flag - :param prefix: the prefix of the flag - :return: None - """ - self._name = name - self._prefix = prefix - - def get_string_entity(self) -> str: - """ - Public. Returns a string representation of the flag - :return: string representation of the flag as str - """ - string_entity: str = self._prefix + self._name - return string_entity - - def get_name(self) -> str: - """ - Public. Returns the name of the flag - :return: the name of the flag as str - """ - return self._name - - def get_prefix(self) -> str: - """ - Public. Returns the prefix of the flag - :return: the prefix of the flag as str - """ - return self._prefix - - def __eq__(self, other) -> bool: - return self.get_string_entity() == other.get_string_entity() +class ValidationStatus(Enum): + VALID = 'VALID' + INVALID = 'INVALID' + UNDEFINED = 'UNDEFINED' -class Flag(BaseFlag): +class Flag: def __init__( - self, - name: str, + self, name: str, *, prefix: Literal["-", "--", "---"] = "--", possible_values: list[str] | Pattern[str] | PossibleValues = PossibleValues.ALL, ) -> None: @@ -62,45 +27,58 @@ class Flag(BaseFlag): :param possible_values: The possible values of the flag, if False then the flag cannot have a value :return: None """ - super().__init__(name, prefix) - self.possible_values = possible_values + self.name: str = name + self.prefix: Literal["-", "--", "---"] = prefix + self.possible_values: list[str] | Pattern[str] | PossibleValues = possible_values - def validate_input_flag_value(self, input_flag_value: str | None): + def validate_input_flag_value(self, input_flag_value: str | None) -> bool: """ Private. Validates the input flag value :param input_flag_value: The input flag value to validate :return: whether the entered flag is valid as bool """ - if self.possible_values == PossibleValues.DISABLE: - if input_flag_value is None: - return True - else: - return False - elif isinstance(self.possible_values, Pattern): - if isinstance(input_flag_value, str): - is_valid = bool(self.possible_values.match(input_flag_value)) - if bool(is_valid): - return True - else: - return False - else: - return False + if self.possible_values == PossibleValues.NEITHER: + return input_flag_value is None - elif isinstance(self.possible_values, list): - if input_flag_value in self.possible_values: - return True - else: - return False + if isinstance(self.possible_values, Pattern): + return isinstance(input_flag_value, str) and bool(self.possible_values.match(input_flag_value)) + + if isinstance(self.possible_values, list): + return input_flag_value in self.possible_values + + return True + + @property + def string_entity(self) -> str: + """ + Public. Returns a string representation of the flag + :return: string representation of the flag as str + """ + string_entity: str = self.prefix + self.name + return string_entity + + @override + def __str__(self) -> str: + return self.string_entity + + @override + def __repr__(self) -> str: + return f'Flag' + + @override + def __eq__(self, other: object) -> bool: + if isinstance(other, Flag): + return self.string_entity == other.string_entity else: - return True + raise NotImplementedError -class InputFlag(BaseFlag): +class InputFlag: def __init__( - self, - name: str, - prefix: Literal["-", "--", "---"] = "--", - value: str | None = None, + self, name: str, *, + prefix: Literal['-', '--', '---'] = '--', + input_value: str | None, + status: ValidationStatus | None ): """ Public. The entity of the flag of the entered command @@ -109,26 +87,33 @@ class InputFlag(BaseFlag): :param value: the value of the input flag :return: None """ - super().__init__(name, prefix) - self._flag_value = value + self.name: str = name + self.prefix: Literal['-', '--', '---'] = prefix + self.input_value: str | None = input_value + self.status: ValidationStatus | None = status + + @property + def string_entity(self) -> str: + """ + Public. Returns a string representation of the flag + :return: string representation of the flag as str + """ + string_entity: str = self.prefix + self.name + return string_entity - def get_value(self) -> str | None: - """ - Public. Returns the value of the flag - :return: the value of the flag as str - """ - return self._flag_value + @override + def __str__(self) -> str: + return f'{self.string_entity} {self.input_value}' + + @override + def __repr__(self) -> str: + return f'InputFlag' - def set_value(self, value): - """ - Private. Sets the value of the flag - :param value: the fag value to set - :return: None - """ - self._flag_value = value - - def __eq__(self, other) -> bool: - return ( - self.get_string_entity() == other.get_string_entity() - and self.get_value() == other.get_value() - ) + @override + def __eq__(self, other: object) -> bool: + if isinstance(other, InputFlag): + return ( + self.name == other.name + ) + else: + raise NotImplementedError diff --git a/src/argenta/command/models.py b/src/argenta/command/models.py index bf9b6b7..6088a21 100644 --- a/src/argenta/command/models.py +++ b/src/argenta/command/models.py @@ -1,35 +1,28 @@ -from argenta.command.flag.models import Flag, InputFlag +from argenta.command.flag.models import Flag, InputFlag, ValidationStatus from argenta.command.flag.flags.models import InputFlags, Flags from argenta.command.exceptions import ( UnprocessedInputFlagException, RepeatedInputFlagsException, EmptyInputCommandException, ) -from typing import cast, Literal +from typing import Never, Self, cast, Literal -class BaseCommand: - def __init__(self, trigger: str) -> None: - """ - Private. Base class for all commands - :param trigger: A string trigger, which, when entered by the user, indicates that the input corresponds to the command - """ - self._trigger = trigger +ParseFlagsResult = tuple[InputFlags, str | None, str | None] +ParseResult = tuple[str, InputFlags] - def get_trigger(self) -> str: - """ - Public. Returns the trigger of the command - :return: the trigger of the command as str - """ - return self._trigger +MIN_FLAG_PREFIX: str = "-" +DEFAULT_WITHOUT_FLAGS: Flags = Flags() + +DEFAULT_WITHOUT_INPUT_FLAGS: InputFlags = InputFlags() -class Command(BaseCommand): +class Command: def __init__( self, - trigger: str, + trigger: str, *, description: str | None = None, - flags: Flag | Flags | None = None, + flags: Flag | Flags = DEFAULT_WITHOUT_FLAGS, aliases: list[str] | None = None, ): """ @@ -39,157 +32,120 @@ class Command(BaseCommand): :param flags: processed commands :param aliases: string synonyms for the main trigger """ - super().__init__(trigger) - self._registered_flags: Flags = ( - flags - if isinstance(flags, Flags) - else Flags(flags) - if isinstance(flags, Flag) - else Flags() - ) - self._description = "Very useful command" if not description else description - self._aliases = aliases if isinstance(aliases, list) else [] - - def get_registered_flags(self) -> Flags: - """ - Private. Returns the registered flags - :return: the registered flags as Flags - """ - return self._registered_flags - - def get_aliases(self) -> list[str] | list: - """ - Public. Returns the aliases of the command - :return: the aliases of the command as list[str] | list - """ - return self._aliases + self.registered_flags: Flags = flags if isinstance(flags, Flags) else Flags([flags]) + self.trigger: str = trigger + self.description: str = description if description else "Command without description" + self.aliases: list[str] = aliases if aliases else [] def validate_input_flag( self, flag: InputFlag - ) -> Literal["Undefined", "Valid", "Invalid"]: + ) -> ValidationStatus: """ Private. Validates the input flag :param flag: input flag for validation :return: is input flag valid as bool """ - registered_flags: Flags | None = self.get_registered_flags() - if registered_flags: - if isinstance(registered_flags, Flag): - if registered_flags.get_string_entity() == flag.get_string_entity(): - is_valid = registered_flags.validate_input_flag_value( - flag.get_value() - ) - if is_valid: - return "Valid" - else: - return "Invalid" + registered_flags: Flags = self.registered_flags + for registered_flag in registered_flags: + if registered_flag.string_entity == flag.string_entity: + is_valid = registered_flag.validate_input_flag_value(flag.input_value) + if is_valid: + return ValidationStatus.VALID else: - return "Undefined" - else: - for registered_flag in registered_flags: - if registered_flag.get_string_entity() == flag.get_string_entity(): - is_valid = registered_flag.validate_input_flag_value( - flag.get_value() - ) - - if is_valid: - return "Valid" - else: - return "Invalid" - return "Undefined" - return "Undefined" - - def get_description(self) -> str: - """ - Private. Returns the description of the command - :return: the description of the command as str - """ - return self._description + return ValidationStatus.INVALID + return ValidationStatus.UNDEFINED -class InputCommand(BaseCommand): - def __init__(self, trigger: str, input_flags: InputFlag | InputFlags | None = None): +class InputCommand: + def __init__(self, trigger: str, *, + input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS): """ Private. The model of the input command, after parsing :param trigger:the trigger of the command :param input_flags: the input flags :return: None """ - super().__init__(trigger) - self._input_flags: InputFlags = ( - input_flags - if isinstance(input_flags, InputFlags) - else InputFlags(input_flags) - if isinstance(input_flags, InputFlag) - else InputFlags() - ) + self.trigger: str = trigger + self.input_flags: InputFlags = input_flags if isinstance(input_flags, InputFlags) else InputFlags([input_flags]) - def _set_input_flags(self, input_flags: InputFlags) -> None: - """ - Private. Sets the input flags - :param input_flags: the input flags to set - :return: None - """ - self._input_flags = input_flags - - def get_input_flags(self) -> InputFlags: - """ - Private. Returns the input flags - :return: the input flags as InputFlags - """ - return self._input_flags - - @staticmethod - def parse(raw_command: str) -> "InputCommand": + @classmethod + def parse(cls, raw_command: str) -> Self: """ Private. Parse the raw input command :param raw_command: raw input command :return: model of the input command, after parsing as InputCommand """ - if not raw_command: + trigger, input_flags = CommandParser(raw_command).parse_raw_command() + + return cls(trigger=trigger, input_flags=input_flags) + + +class CommandParser: + def __init__(self, raw_command: str) -> None: + self.raw_command: str = raw_command + self._parsed_input_flags: InputFlags = InputFlags() + + def parse_raw_command(self) -> ParseResult: + if not self.raw_command: raise EmptyInputCommandException() - list_of_tokens = raw_command.split() - command = list_of_tokens.pop(0) + input_flags, crnt_flag_name, crnt_flag_val = self._parse_flags(self.raw_command.split()[1:]) - input_flags: InputFlags = InputFlags() - current_flag_name, current_flag_value = None, None - - for k, _ in enumerate(list_of_tokens): - if _.startswith("-"): - if len(_) < 2 or len(_[: _.rfind("-")]) > 3: - raise UnprocessedInputFlagException() - current_flag_name = _ - else: - if not current_flag_name or current_flag_value: - raise UnprocessedInputFlagException() - current_flag_value = _ - - if current_flag_name: - if not len(list_of_tokens) == k + 1: - if not list_of_tokens[k + 1].startswith("-"): - continue - - input_flag = InputFlag( - name=current_flag_name[current_flag_name.rfind("-") + 1 :], - prefix=cast( - Literal["-", "--", "---"], - current_flag_name[: current_flag_name.rfind("-") + 1], - ), - value=current_flag_value, - ) - - all_flags = [ - flag.get_string_entity() for flag in input_flags.get_flags() - ] - if input_flag.get_string_entity() not in all_flags: - input_flags.add_flag(input_flag) - else: - raise RepeatedInputFlagsException(input_flag) - - current_flag_name, current_flag_value = None, None - - if any([current_flag_name, current_flag_value]): + if any([crnt_flag_name, crnt_flag_val]): raise UnprocessedInputFlagException() else: - return InputCommand(trigger=command, input_flags=input_flags) + return (self.raw_command.split()[0], input_flags) + + def _parse_flags(self, _tokens: list[str] | list[Never]) -> ParseFlagsResult: + crnt_flg_name, crnt_flg_val = None, None + for index, token in enumerate(_tokens): + crnt_flg_name, crnt_flg_val = _parse_single_token(token, crnt_flg_name, crnt_flg_val) + + if not crnt_flg_name or self._is_next_token_value(index, _tokens): + continue + + input_flag = InputFlag( + name=crnt_flg_name[crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1:], + prefix=cast( + Literal["-", "--", "---"], + crnt_flg_name[:crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1], + ), + input_value=crnt_flg_val, + status=None + ) + + if input_flag in self._parsed_input_flags: + raise RepeatedInputFlagsException(input_flag) + + self._parsed_input_flags.add_flag(input_flag) + crnt_flg_name, crnt_flg_val = None, None + + return (self._parsed_input_flags, crnt_flg_name, crnt_flg_val) + + def _is_next_token_value(self, current_index: int, + _tokens: list[str] | list[Never]) -> bool: + next_index = current_index + 1 + if next_index >= len(_tokens): + return False + + next_token = _tokens[next_index] + return not next_token.startswith(MIN_FLAG_PREFIX) + +def _parse_single_token( + token: str, + crnt_flag_name: str | None, + crnt_flag_val: str | None +) -> tuple[str | None, str | None]: + if not token.startswith(MIN_FLAG_PREFIX): + if not crnt_flag_name or crnt_flag_val: + raise UnprocessedInputFlagException + return crnt_flag_name, token + + prefix = token[:token.rfind(MIN_FLAG_PREFIX)] + if len(token) < 2 or len(prefix) > 2: + raise UnprocessedInputFlagException + + new_flag_name = token + new_flag_value = None + + return new_flag_name, new_flag_value diff --git a/src/argenta/metrics/main.py b/src/argenta/metrics/main.py index bcff06b..cc68072 100644 --- a/src/argenta/metrics/main.py +++ b/src/argenta/metrics/main.py @@ -2,7 +2,7 @@ import io from contextlib import redirect_stdout from time import time -from argenta.app import App +from argenta import App def get_time_of_pre_cycle_setup(app: App) -> float: @@ -13,6 +13,6 @@ def get_time_of_pre_cycle_setup(app: App) -> float: """ start = time() with redirect_stdout(io.StringIO()): - app.pre_cycle_setup() + app._pre_cycle_setup() # pyright: ignore[reportPrivateUsage] end = time() return end - start diff --git a/src/argenta/orchestrator/__init__.py b/src/argenta/orchestrator/__init__.py index cc56f20..b658f94 100644 --- a/src/argenta/orchestrator/__init__.py +++ b/src/argenta/orchestrator/__init__.py @@ -1,4 +1,8 @@ -__all__ = ["Orchestrator"] +__all__ = [ + "Orchestrator", + "ArgParser" +] from argenta.orchestrator.entity import Orchestrator +from argenta.orchestrator.argparser.entity import ArgParser diff --git a/src/argenta/orchestrator/argparser/__init__.py b/src/argenta/orchestrator/argparser/__init__.py index 729710f..01d2f42 100644 --- a/src/argenta/orchestrator/argparser/__init__.py +++ b/src/argenta/orchestrator/argparser/__init__.py @@ -1,4 +1,12 @@ -__all__ = ["ArgParser"] +__all__ = [ + "ArgParser", + "PositionalArgument", + "OptionalArgument", + "BooleanArgument" +] from argenta.orchestrator.argparser.entity import ArgParser +from argenta.orchestrator.argparser.arguments import (BooleanArgument, + PositionalArgument, + OptionalArgument) diff --git a/src/argenta/orchestrator/argparser/arguments/models.py b/src/argenta/orchestrator/argparser/arguments/models.py index fa0b154..1a74312 100644 --- a/src/argenta/orchestrator/argparser/arguments/models.py +++ b/src/argenta/orchestrator/argparser/arguments/models.py @@ -1,19 +1,19 @@ from abc import ABC, abstractmethod -from typing import Literal +from typing import Literal, override class BaseArgument(ABC): """ Private. Base class for all arguments """ - + @property @abstractmethod - def get_string_entity(self) -> str: + def string_entity(self) -> str: """ Public. Returns the string representation of the argument :return: the string representation as a str """ - pass + raise NotImplementedError class PositionalArgument(BaseArgument): @@ -22,9 +22,11 @@ class PositionalArgument(BaseArgument): Public. Required argument at startup :param name: name of the argument, must not start with minus (-) """ - self.name = name + self.name: str = name - def get_string_entity(self): + @property + @override + def string_entity(self) -> str: return self.name @@ -35,10 +37,12 @@ class OptionalArgument(BaseArgument): :param name: name of the argument :param prefix: prefix of the argument """ - self.name = name - self.prefix = prefix + self.name: str = name + self.prefix: Literal["-", "--", "---"] = prefix - def get_string_entity(self): + @property + @override + def string_entity(self) -> str: return self.prefix + self.name @@ -49,8 +53,10 @@ class BooleanArgument(BaseArgument): :param name: name of the argument :param prefix: prefix of the argument """ - self.name = name - self.prefix = prefix + self.name: str = name + self.prefix: Literal["-", "--", "---"] = prefix - def get_string_entity(self): + @property + @override + def string_entity(self) -> str: return self.prefix + self.name diff --git a/src/argenta/orchestrator/argparser/entity.py b/src/argenta/orchestrator/argparser/entity.py index 3092250..5c3f29d 100644 --- a/src/argenta/orchestrator/argparser/entity.py +++ b/src/argenta/orchestrator/argparser/entity.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from argenta.orchestrator.argparser.arguments.models import ( BooleanArgument, @@ -10,7 +10,7 @@ from argenta.orchestrator.argparser.arguments.models import ( class ArgParser: def __init__( self, - processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument], + processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument], *, name: str = "Argenta", description: str = "Argenta available arguments", epilog: str = "github.com/koloideal/Argenta | made by kolo", @@ -22,38 +22,18 @@ class ArgParser: :param epilog: the epilog of the ArgParse instance :param processed_args: registered and processed arguments """ - self.name = name - self.description = description - self.epilog = epilog + self._name: str = name + self._description: str = description + self._epilog: str = epilog - self.entity: ArgumentParser = ArgumentParser( - prog=name, description=description, epilog=epilog - ) - self.args: ( - list[PositionalArgument | OptionalArgument | BooleanArgument] | None - ) = processed_args + self._entity: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog) + self._processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument] = processed_args + + for arg in processed_args: + if isinstance(arg, PositionalArgument) or isinstance(arg, OptionalArgument): + _ = self._entity.add_argument(arg.string_entity) + else: + _ = self._entity.add_argument(arg.string_entity, action="store_true") - def set_args( - self, *args: PositionalArgument | OptionalArgument | BooleanArgument - ) -> None: - """ - Public. Sets the arguments to be processed - :param args: processed arguments - :return: None - """ - self.args.extend(args) - - def register_args(self) -> None: - """ - Private. Registers initialized command line arguments - :return: None - """ - if not self.args: - return - for arg in self.args: - if type(arg) is PositionalArgument: - self.entity.add_argument(arg.get_string_entity()) - elif type(arg) is OptionalArgument: - self.entity.add_argument(arg.get_string_entity()) - elif type(arg) is BooleanArgument: - self.entity.add_argument(arg.get_string_entity(), action="store_true") + def parse_args(self) -> Namespace: + return self._entity.parse_args() diff --git a/src/argenta/orchestrator/entity.py b/src/argenta/orchestrator/entity.py index edbc952..ff73623 100644 --- a/src/argenta/orchestrator/entity.py +++ b/src/argenta/orchestrator/entity.py @@ -11,12 +11,9 @@ class Orchestrator: :param arg_parser: Cmd argument parser and configurator at startup :return: None """ - self.arg_parser: ArgParser | None = arg_parser - if arg_parser: - self.arg_parser.register_args() + self._arg_parser: ArgParser | None = arg_parser - @staticmethod - def start_polling(app: App) -> None: + def start_polling(self, app: App) -> None: """ Public. Starting the user input processing cycle :param app: a running application @@ -29,7 +26,7 @@ class Orchestrator: Public. Returns the arguments parsed :return: None """ - if self.arg_parser: - return self.arg_parser.entity.parse_args() + if self._arg_parser: + return self._arg_parser.parse_args() else: return None diff --git a/src/argenta/py.typed b/src/argenta/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/argenta/response/__init__.py b/src/argenta/response/__init__.py index 57fe5da..d7eb4a1 100644 --- a/src/argenta/response/__init__.py +++ b/src/argenta/response/__init__.py @@ -1,5 +1,5 @@ -__all__ = ["Response", "Status"] +__all__ = ["Response", "ResponseStatus"] from argenta.response.entity import Response -from argenta.response.status import Status +from argenta.response.status import ResponseStatus diff --git a/src/argenta/response/entity.py b/src/argenta/response/entity.py index b94eac0..8ecd209 100644 --- a/src/argenta/response/entity.py +++ b/src/argenta/response/entity.py @@ -1,29 +1,23 @@ -from argenta.response.status import Status -from argenta.command.flag.flags import ( - ValidInputFlags, - UndefinedInputFlags, - InvalidValueInputFlags, -) +from typing import Literal +from argenta.command.flag.flags.models import InputFlags +from argenta.response.status import ResponseStatus + + +EMPTY_INPUT_FLAGS: InputFlags = InputFlags() class Response: - __slots__ = ("status", "valid_flags", "undefined_flags", "invalid_value_flags") + __slots__: tuple[Literal['status', 'input_flags'], ...] = ("status", "input_flags") def __init__( self, - status: Status | None = None, - valid_flags: ValidInputFlags = ValidInputFlags(), - undefined_flags: UndefinedInputFlags = UndefinedInputFlags(), - invalid_value_flags: InvalidValueInputFlags = InvalidValueInputFlags(), + status: ResponseStatus, + input_flags: InputFlags = EMPTY_INPUT_FLAGS, ): """ Public. The entity of the user input sent to the handler :param status: the status of the response - :param valid_flags: valid input flags - :param undefined_flags: undefined input flags - :param invalid_value_flags: input flags with invalid values + :param input_flags: all input flags """ - self.status = status - self.valid_flags = valid_flags - self.undefined_flags = undefined_flags - self.invalid_value_flags = invalid_value_flags + self.status: ResponseStatus = status + self.input_flags: InputFlags = input_flags diff --git a/src/argenta/response/status.py b/src/argenta/response/status.py index 735e6f6..7c860fd 100644 --- a/src/argenta/response/status.py +++ b/src/argenta/response/status.py @@ -1,8 +1,19 @@ from enum import Enum -class Status(Enum): +class ResponseStatus(Enum): ALL_FLAGS_VALID = "ALL_FLAGS_VALID" UNDEFINED_FLAGS = "UNDEFINED_FLAGS" INVALID_VALUE_FLAGS = "INVALID_VALUE_FLAGS" UNDEFINED_AND_INVALID_FLAGS = "UNDEFINED_AND_INVALID_FLAGS" + + @classmethod + def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> 'ResponseStatus': + key = (has_invalid_value_flags, has_undefined_flags) + status_map: dict[tuple[bool, bool], ResponseStatus] = { + (True, True): cls.UNDEFINED_AND_INVALID_FLAGS, + (True, False): cls.INVALID_VALUE_FLAGS, + (False, True): cls.UNDEFINED_FLAGS, + (False, False): cls.ALL_FLAGS_VALID, + } + return status_map[key] diff --git a/src/argenta/router/__init__.py b/src/argenta/router/__init__.py index 829a82e..ecdb426 100644 --- a/src/argenta/router/__init__.py +++ b/src/argenta/router/__init__.py @@ -1,4 +1,4 @@ -__all__ = ["Router"] - - from argenta.router.entity import Router + + +__all__ = ["Router"] diff --git a/src/argenta/router/command_handler/entity.py b/src/argenta/router/command_handler/entity.py index 1b5519e..a9290fb 100644 --- a/src/argenta/router/command_handler/entity.py +++ b/src/argenta/router/command_handler/entity.py @@ -1,18 +1,19 @@ -from typing import Callable, Iterator +from collections.abc import Iterator +from typing import Callable from argenta.command import Command from argenta.response import Response class CommandHandler: - def __init__(self, handler: Callable[[Response], None], handled_command: Command): + def __init__(self, handler_as_func: Callable[[Response], None], handled_command: Command): """ Private. Entity of the model linking the handler and the command being processed :param handler: the handler being called :param handled_command: the command being processed """ - self._handler = handler - self._handled_command = handled_command + self.handler_as_func: Callable[[Response], None] = handler_as_func + self.handled_command: Command = handled_command def handling(self, response: Response) -> None: """ @@ -20,21 +21,7 @@ class CommandHandler: :param response: the entity of response: various groups of flags and status of response :return: None """ - self._handler(response) - - def get_handler(self) -> Callable[[Response], None]: - """ - Private. Returns the handler being called - :return: the handler being called as Callable[[Response], None] - """ - return self._handler - - def get_handled_command(self) -> Command: - """ - Private. Returns the command being processed - :return: the command being processed as Command - """ - return self._handled_command + self.handler_as_func(response) class CommandHandlers: @@ -43,14 +30,7 @@ class CommandHandlers: Private. The model that unites all CommandHandler of the routers :param command_handlers: list of CommandHandlers for register """ - self.command_handlers = command_handlers if command_handlers else [] - - def get_handlers(self) -> list[CommandHandler]: - """ - Private. Returns the list of CommandHandlers - :return: the list of CommandHandlers as list[CommandHandler] - """ - return self.command_handlers + self.command_handlers: list[CommandHandler] = command_handlers if command_handlers else [] def add_handler(self, command_handler: CommandHandler) -> None: """ diff --git a/src/argenta/router/entity.py b/src/argenta/router/entity.py index 04fd02b..34a9870 100644 --- a/src/argenta/router/entity.py +++ b/src/argenta/router/entity.py @@ -1,17 +1,14 @@ -from typing import Callable, Literal, Type +from typing import Callable, TypeAlias from inspect import getfullargspec, get_annotations, getsourcefile, getsourcelines from rich.console import Console -from argenta.command import Command -from argenta.command.models import InputCommand -from argenta.response import Response, Status +from argenta.command import Command, InputCommand +from argenta.command.flag import ValidationStatus +from argenta.response import Response, ResponseStatus from argenta.router.command_handler.entity import CommandHandlers, CommandHandler from argenta.command.flag.flags import ( Flags, - InputFlags, - UndefinedInputFlags, - ValidInputFlags, - InvalidValueInputFlags, + InputFlags ) from argenta.router.exceptions import ( RepeatedFlagNameException, @@ -21,9 +18,13 @@ from argenta.router.exceptions import ( ) +HandlerFunc: TypeAlias = Callable[[Response], None] + + class Router: def __init__( - self, title: str | None = "Awesome title", disable_redirect_stdout: bool = False + self, *, title: str | None = "Default title", + disable_redirect_stdout: bool = False ): """ Public. Directly configures and manages handlers @@ -35,13 +36,13 @@ class Router: which is ambiguous behavior and can lead to unexpected work :return: None """ - self.title = title - self.disable_redirect_stdout = disable_redirect_stdout + self.title: str | None = title + self.disable_redirect_stdout: bool = disable_redirect_stdout - self._command_handlers: CommandHandlers = CommandHandlers() - self._ignore_command_register: bool = False + self.command_handlers: CommandHandlers = CommandHandlers() + self.command_register_ignore: bool = False - def command(self, command: Command | str) -> Callable: + def command(self, command: Command | str) -> Callable[[HandlerFunc], HandlerFunc]: """ Public. Registers handler :param command: Registered command @@ -51,18 +52,16 @@ class Router: redefined_command = Command(command) else: redefined_command = command - self._validate_command(redefined_command) - def command_decorator(func): - Router._validate_func_args(func) - self._command_handlers.add_handler(CommandHandler(func, redefined_command)) + _validate_command(redefined_command) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) + def decorator(func: HandlerFunc) -> HandlerFunc: + _validate_func_args(func) + self.command_handlers.add_handler(CommandHandler(func, redefined_command)) - return wrapper + return func - return command_decorator + return decorator def finds_appropriate_handler(self, input_command: InputCommand) -> None: """ @@ -70,14 +69,14 @@ class Router: :param input_command: input command as InputCommand :return: None """ - input_command_name: str = input_command.get_trigger() - input_command_flags: InputFlags = input_command.get_input_flags() + input_command_name: str = input_command.trigger + input_command_flags: InputFlags = input_command.input_flags - for command_handler in self._command_handlers: - handle_command = command_handler.get_handled_command() - if input_command_name.lower() == handle_command.get_trigger().lower(): + for command_handler in self.command_handlers: + handle_command = command_handler.handled_command + if input_command_name.lower() == handle_command.trigger.lower(): self.process_input_command(input_command_flags, command_handler) - if input_command_name.lower() in handle_command.get_aliases(): + if input_command_name.lower() in handle_command.aliases: self.process_input_command(input_command_flags, command_handler) def process_input_command( @@ -89,152 +88,125 @@ class Router: :param command_handler: command handler for input command as CommandHandler :return: None """ - handle_command = command_handler.get_handled_command() - response: Response = Response() - if handle_command.get_registered_flags().get_flags(): - if input_command_flags.get_flags(): - response: Response = self._structuring_input_flags( handle_command, input_command_flags ) + handle_command = command_handler.handled_command + if handle_command.registered_flags.flags: + if input_command_flags.flags: + response: Response = _structuring_input_flags(handle_command, input_command_flags) command_handler.handling(response) else: - response.status = Status.ALL_FLAGS_VALID + response = Response(ResponseStatus.ALL_FLAGS_VALID) command_handler.handling(response) else: - if input_command_flags.get_flags(): - response.status = Status.UNDEFINED_FLAGS - response.undefined_flags = UndefinedInputFlags() - response.undefined_flags.add_flags(input_command_flags.get_flags()) + if input_command_flags.flags: + undefined_flags = InputFlags() + for input_flag in input_command_flags: + input_flag.status = ValidationStatus.UNDEFINED + undefined_flags.add_flag(input_flag) + response = Response(ResponseStatus.UNDEFINED_FLAGS, input_flags=undefined_flags) command_handler.handling(response) else: - response.status = Status.ALL_FLAGS_VALID + response = Response(ResponseStatus.ALL_FLAGS_VALID) command_handler.handling(response) - @staticmethod - def _structuring_input_flags( - handled_command: Command, input_flags: InputFlags - ) -> Response: - """ - Private. Validates flags of input command - :param handled_command: entity of the handled command - :param input_flags: - :return: entity of response as Response - """ - valid_input_flags: ValidInputFlags = ValidInputFlags() - invalid_value_input_flags: InvalidValueInputFlags = InvalidValueInputFlags() - undefined_input_flags: UndefinedInputFlags = UndefinedInputFlags() - for flag in input_flags: - flag_status: Literal["Undefined", "Valid", "Invalid"] = ( - handled_command.validate_input_flag(flag) - ) - if flag_status == "Valid": - valid_input_flags.add_flag(flag) - elif flag_status == "Undefined": - undefined_input_flags.add_flag(flag) - elif flag_status == "Invalid": - invalid_value_input_flags.add_flag(flag) - - if ( - not invalid_value_input_flags.get_flags() - and not undefined_input_flags.get_flags() - ): - status = Status.ALL_FLAGS_VALID - elif ( - invalid_value_input_flags.get_flags() - and not undefined_input_flags.get_flags() - ): - status = Status.INVALID_VALUE_FLAGS - elif ( - not invalid_value_input_flags.get_flags() - and undefined_input_flags.get_flags() - ): - status = Status.UNDEFINED_FLAGS - else: - status = Status.UNDEFINED_AND_INVALID_FLAGS - - return Response( - invalid_value_flags=invalid_value_input_flags, - valid_flags=valid_input_flags, - status=status, - undefined_flags=undefined_input_flags, - ) - - @staticmethod - def _validate_command(command: Command) -> None: - """ - Private. Validates the command registered in handler - :param command: validated command - :return: None if command is valid else raise exception - """ - command_name: str = command.get_trigger() - if command_name.find(" ") != -1: - raise TriggerContainSpacesException() - flags: Flags = command.get_registered_flags() - if flags: - flags_name: list = [x.get_string_entity().lower() for x in flags] - if len(set(flags_name)) < len(flags_name): - raise RepeatedFlagNameException() - - @staticmethod - def _validate_func_args(func: Callable) -> None: - """ - Private. Validates the arguments of the handler - :param func: entity of the handler func - :return: None if func is valid else raise exception - """ - transferred_args = getfullargspec(func).args - if len(transferred_args) > 1: - raise TooManyTransferredArgsException() - elif len(transferred_args) == 0: - raise RequiredArgumentNotPassedException() - - transferred_arg: str = transferred_args[0] - func_annotations: dict[str, Type] = get_annotations(func) - - if arg_annotation := func_annotations.get(transferred_arg): - if arg_annotation is Response: - pass - else: - file_path: str | None = getsourcefile(func) - source_line: int = getsourcelines(func)[1] - fprint = Console().print - fprint( - f'\nFile "{file_path}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint ' - f"of argument([green]{transferred_arg}[/green]) passed to the handler is [/i][bold blue]{Response}[/bold blue]," - f" [i]but[/i] [bold blue]{arg_annotation}[/bold blue] [i]is specified[/i]", - highlight=False, - ) - - def set_command_register_ignore(self, _: bool) -> None: - """ - Private. Sets the router behavior on the input commands register - :param _: is command register ignore - :return: None - """ - self._ignore_command_register = _ - - def get_triggers(self) -> list[str]: + @property + def triggers(self) -> list[str]: """ Public. Gets registered triggers :return: registered in router triggers as list[str] """ all_triggers: list[str] = [] - for command_handler in self._command_handlers: - all_triggers.append(command_handler.get_handled_command().get_trigger()) + for command_handler in self.command_handlers: + all_triggers.append(command_handler.handled_command.trigger) return all_triggers - def get_aliases(self) -> list[str]: + @property + def aliases(self) -> list[str]: """ Public. Gets registered aliases :return: registered in router aliases as list[str] """ all_aliases: list[str] = [] - for command_handler in self._command_handlers: - if command_handler.get_handled_command().get_aliases(): - all_aliases.extend(command_handler.get_handled_command().get_aliases()) + for command_handler in self.command_handlers: + if command_handler.handled_command.aliases: + all_aliases.extend(command_handler.handled_command.aliases) return all_aliases - def get_command_handlers(self) -> CommandHandlers: - """ - Private. Gets registered command handlers - :return: registered command handlers as CommandHandlers - """ - return self._command_handlers + +class CommandDecorator: + def __init__(self, router_instance: Router, command: Command): + self.router: Router = router_instance + self.command: Command = command + + def __call__(self, handler_func: Callable[[Response], None]) -> Callable[[Response], None]: + _validate_func_args(handler_func) + self.router.command_handlers.add_handler(CommandHandler(handler_func, self.command)) + return handler_func + + +def _structuring_input_flags(handled_command: Command, + input_flags: InputFlags) -> Response: + """ + Private. Validates flags of input command + :param handled_command: entity of the handled command + :param input_flags: + :return: entity of response as Response + """ + invalid_value_flags, undefined_flags = False, False + + for flag in input_flags: + flag_status: ValidationStatus = (handled_command.validate_input_flag(flag)) + flag.status = flag_status + if flag_status == ValidationStatus.INVALID: + invalid_value_flags = True + elif flag_status == ValidationStatus.UNDEFINED: + undefined_flags = True + + status = ResponseStatus.from_flags(has_invalid_value_flags=invalid_value_flags, + has_undefined_flags=undefined_flags) + + return Response( + status=status, + input_flags=input_flags + ) + +def _validate_func_args(func: Callable[[Response], None]) -> None: + """ + Private. Validates the arguments of the handler + :param func: entity of the handler func + :return: None if func is valid else raise exception + """ + transferred_args = getfullargspec(func).args + if len(transferred_args) > 1: + raise TooManyTransferredArgsException() + elif len(transferred_args) == 0: + raise RequiredArgumentNotPassedException() + + transferred_arg: str = transferred_args[0] + func_annotations: dict[str, None] = get_annotations(func) + + arg_annotation = func_annotations.get(transferred_arg) + + if arg_annotation is not None: + if arg_annotation is not Response: + source_line: int = getsourcelines(func)[1] + Console().print( + f'\nFile "{getsourcefile(func)}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint ' + + f"of argument([green]{transferred_arg}[/green]) passed to the handler must be [/i][bold blue]{Response}[/bold blue]," + + f" [i]but[/i] [bold blue]{arg_annotation}[/bold blue] [i]is specified[/i]", + highlight=False, + ) + + +def _validate_command(command: Command) -> None: + """ + Private. Validates the command registered in handler + :param command: validated command + :return: None if command is valid else raise exception + """ + command_name: str = command.trigger + if command_name.find(" ") != -1: + raise TriggerContainSpacesException() + flags: Flags = command.registered_flags + flags_name: list[str] = [flag.string_entity.lower() for flag in flags] + if len(set(flags_name)) < len(flags_name): + raise RepeatedFlagNameException() diff --git a/src/argenta/router/exceptions.py b/src/argenta/router/exceptions.py index 406d3fe..fe45870 100644 --- a/src/argenta/router/exceptions.py +++ b/src/argenta/router/exceptions.py @@ -1,9 +1,12 @@ +from typing import override + + class RepeatedFlagNameException(Exception): """ Private. Raised when a repeated flag name is registered """ - - def __str__(self): + @override + def __str__(self) -> str: return "Repeated registered flag names in register command" @@ -11,8 +14,8 @@ class TooManyTransferredArgsException(Exception): """ Private. Raised when too many arguments are passed """ - - def __str__(self): + @override + def __str__(self) -> str: return "Too many transferred arguments" @@ -20,8 +23,8 @@ class RequiredArgumentNotPassedException(Exception): """ Private. Raised when a required argument is not passed """ - - def __str__(self): + @override + def __str__(self) -> str: return "Required argument not passed" @@ -29,6 +32,6 @@ class TriggerContainSpacesException(Exception): """ Private. Raised when there is a space in the trigger being registered """ - - def __str__(self): + @override + def __str__(self) -> str: return "Command trigger cannot contain spaces" diff --git a/tests/system_tests/test_system_handling_non_standard_behavior.py b/tests/system_tests/test_system_handling_non_standard_behavior.py index 4ea5682..67ba794 100644 --- a/tests/system_tests/test_system_handling_non_standard_behavior.py +++ b/tests/system_tests/test_system_handling_non_standard_behavior.py @@ -5,10 +5,10 @@ import io import re from argenta.app import App -from argenta.command import Command +from argenta.command import Command, PredefinedFlags +from argenta.command.flag.models import ValidationStatus from argenta.router import Router from argenta.command.flag.flags.models import Flags -from argenta.command.flag.defaults import PredefinedFlags from argenta.orchestrator import Orchestrator from argenta.response import Response @@ -22,13 +22,13 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print('test command') app = App(override_system_messages=True, print_func=print) app.include_router(router) - app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}')) + app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}')) orchestrator.start_polling(app) output = mock_stdout.getvalue() @@ -43,14 +43,14 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print('test command') app = App(ignore_command_register=False, override_system_messages=True, print_func=print) app.include_router(router) - app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}')) + app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}')) orchestrator.start_polling(app) output = mock_stdout.getvalue() @@ -65,8 +65,10 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): - print(f'test command with undefined flag: {response.undefined_flags.get_flag('help').get_string_entity()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + undefined_flag = response.input_flags.get_flag_by_name('help') + if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED: + print(f'test command with undefined flag: {undefined_flag.string_entity}') app = App(override_system_messages=True, print_func=print) @@ -85,9 +87,12 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): - flag = response.undefined_flags.get_flag("port") - print(f'test command with undefined flag with value: {flag.get_string_entity()} {flag.get_value()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + undefined_flag = response.input_flags.get_flag_by_name("port") + if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED: + print(f'test command with undefined flag with value: {undefined_flag.string_entity} {undefined_flag.input_value}') + else: + raise app = App(override_system_messages=True, print_func=print) @@ -104,12 +109,13 @@ class TestSystemHandlerNormalWork(TestCase): def test_input_correct_command_with_one_correct_flag_an_one_incorrect_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock): router = Router() orchestrator = Orchestrator() - flags = Flags(PredefinedFlags.HOST) + flags = Flags([PredefinedFlags.HOST]) @router.command(Command('test', flags=flags)) - def test(response: Response): - flag = response.undefined_flags.get_flag("port") - print(f'connecting to host with flag: {flag.get_string_entity()} {flag.get_value()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + undefined_flag = response.input_flags.get_flag_by_name("port") + if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED: + print(f'connecting to host with flag: {undefined_flag.string_entity} {undefined_flag.input_value}') app = App(override_system_messages=True, print_func=print) @@ -128,13 +134,13 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') app = App(override_system_messages=True, print_func=print) app.include_router(router) - app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}')) + app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}')) orchestrator.start_polling(app) output = mock_stdout.getvalue() @@ -149,17 +155,17 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') @router.command(Command('more')) - def test(response: Response): + def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'more command') app = App(override_system_messages=True, print_func=print) app.include_router(router) - app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}')) + app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}')) orchestrator.start_polling(app) output = mock_stdout.getvalue() @@ -174,7 +180,7 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') app = App(override_system_messages=True, @@ -195,7 +201,7 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') app = App(override_system_messages=True, @@ -216,7 +222,7 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test', flags=PredefinedFlags.PORT)) - def test(response: Response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print('test command') app = App(override_system_messages=True, @@ -227,4 +233,25 @@ class TestSystemHandlerNormalWork(TestCase): output = mock_stdout.getvalue() - self.assertIn("\nRepeated input flags: \"test --port 22 --port 33\"\n", output) + self.assertIn('Repeated input flags: "test --port 22 --port 33"', output) + + @patch("builtins.input", side_effect=["test --help", "q"]) + @patch("sys.stdout", new_callable=io.StringIO) + def test_input_correct_command_with_unregistered_flag3(self, mock_stdout: _io.StringIO, magick_mock: MagicMock): + router = Router() + orchestrator = Orchestrator() + + @router.command(Command('test')) + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + undefined_flag = response.input_flags.get_flag_by_name('help') + if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED: + print(f'test command with undefined flag: {undefined_flag.string_entity}') + + app = App(override_system_messages=True, + print_func=print) + app.include_router(router) + orchestrator.start_polling(app) + + output = mock_stdout.getvalue() + + self.assertIn('\ntest command with undefined flag: --help\n', output) diff --git a/tests/system_tests/test_system_handling_normal_behavior.py b/tests/system_tests/test_system_handling_normal_behavior.py index 88d2565..29fa790 100644 --- a/tests/system_tests/test_system_handling_normal_behavior.py +++ b/tests/system_tests/test_system_handling_normal_behavior.py @@ -5,13 +5,13 @@ import io import re from argenta.app import App -from argenta.command import Command +from argenta.command import Command, PredefinedFlags +from argenta.command.flag.models import PossibleValues, ValidationStatus from argenta.response import Response from argenta.router import Router from argenta.orchestrator import Orchestrator from argenta.command.flag import Flag from argenta.command.flag.flags import Flags -from argenta.command.flag.defaults import PredefinedFlags @@ -23,7 +23,7 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print('test command') app = App(override_system_messages=True, @@ -43,7 +43,7 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print('test command') app = App(ignore_command_register=True, @@ -62,11 +62,13 @@ class TestSystemHandlerNormalWork(TestCase): def test_input_correct_command_with_custom_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock): router = Router() orchestrator = Orchestrator() - flag = Flag('help', '--', False) + flag = Flag('help', prefix='--', possible_values=PossibleValues.NEITHER) @router.command(Command('test', flags=flag)) - def test(response: Response): - print(f'\nhelp for {response.valid_flags.get_flag('help').get_name()} flag\n') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + valid_flag = response.input_flags.get_flag_by_name('help') + if valid_flag and valid_flag.status == ValidationStatus.VALID: + print(f'\nhelp for {valid_flag.name} flag\n') app = App(override_system_messages=True, print_func=print) @@ -82,12 +84,13 @@ class TestSystemHandlerNormalWork(TestCase): def test_input_correct_command_with_custom_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock): router = Router() orchestrator = Orchestrator() - flag = Flag('port', '--', re.compile(r'^\d{1,5}$')) + flag = Flag('port', prefix='--', possible_values=re.compile(r'^\d{1,5}$')) @router.command(Command('test', flags=flag)) - def test(response: Response): - input_flag = response.valid_flags.get_flag('port') - print(f'flag value for {input_flag.get_name()} flag : {input_flag.get_value()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + valid_flag = response.input_flags.get_flag_by_name('port') + if valid_flag and valid_flag.status == ValidationStatus.VALID: + print(f'flag value for {valid_flag.name} flag : {valid_flag.input_value}') app = App(override_system_messages=True, print_func=print) @@ -107,8 +110,10 @@ class TestSystemHandlerNormalWork(TestCase): flag = PredefinedFlags.SHORT_HELP @router.command(Command('test', flags=flag)) - def test(response: Response): - print(f'help for {response.valid_flags.get_flag('H').get_name()} flag') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + valid_flag = response.input_flags.get_flag_by_name('H') + if valid_flag and valid_flag.status == ValidationStatus.VALID: + print(f'help for {valid_flag.name} flag') app = App(override_system_messages=True, print_func=print) @@ -128,8 +133,9 @@ class TestSystemHandlerNormalWork(TestCase): flag = PredefinedFlags.INFO @router.command(Command('test', flags=flag)) - def test(response: Response): - if response.valid_flags.get_flag('info'): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + valid_flag = response.input_flags.get_flag_by_name('info') + if valid_flag and valid_flag.status == ValidationStatus.VALID: print('info about test command') app = App(override_system_messages=True, @@ -150,8 +156,10 @@ class TestSystemHandlerNormalWork(TestCase): flag = PredefinedFlags.HOST @router.command(Command('test', flags=flag)) - def test(response: Response): - print(f'connecting to host {response.valid_flags.get_flag('host').get_value()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + valid_flag = response.input_flags.get_flag_by_name('host') + if valid_flag and valid_flag.status == ValidationStatus.VALID: + print(f'connecting to host {valid_flag.input_value}') app = App(override_system_messages=True, print_func=print) @@ -168,12 +176,14 @@ class TestSystemHandlerNormalWork(TestCase): def test_input_correct_command_with_two_flags(self, mock_stdout: _io.StringIO, magick_mock: MagicMock): router = Router() orchestrator = Orchestrator() - flags = Flags(PredefinedFlags.HOST, PredefinedFlags.PORT) + flags = Flags([PredefinedFlags.HOST, PredefinedFlags.PORT]) @router.command(Command('test', flags=flags)) - def test(response: Response): - valid_flags = response.valid_flags - print(f'connecting to host {valid_flags.get_flag('host').get_value()} and port {valid_flags.get_flag('port').get_value()}') + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] + host_flag = response.input_flags.get_flag_by_name('host') + port_flag = response.input_flags.get_flag_by_name('port') + if (host_flag and host_flag.status == ValidationStatus.VALID) and (port_flag and port_flag.status == ValidationStatus.VALID): + print(f'connecting to host {host_flag.input_value} and port {port_flag.input_value}') app = App(override_system_messages=True, print_func=print) @@ -192,11 +202,11 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') @router.command(Command('some')) - def test2(response): + def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'some command') app = App(override_system_messages=True, @@ -216,15 +226,15 @@ class TestSystemHandlerNormalWork(TestCase): orchestrator = Orchestrator() @router.command(Command('test')) - def test(response): + def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'test command') @router.command(Command('some')) - def test(response): + def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'some command') @router.command(Command('more')) - def test(response): + def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction] print(f'more command') app = App(override_system_messages=True, diff --git a/tests/unit_tests/test_command.py b/tests/unit_tests/test_command.py index fa6badd..4577f77 100644 --- a/tests/unit_tests/test_command.py +++ b/tests/unit_tests/test_command.py @@ -1,6 +1,7 @@ from argenta.command.flag import Flag, InputFlag from argenta.command.flag.flags import Flags -from argenta.command.models import InputCommand, Command +from argenta.command.flag.models import PossibleValues +from argenta.command.models import InputCommand, Command, ValidationStatus from argenta.command.exceptions import (UnprocessedInputFlagException, RepeatedInputFlagsException, EmptyInputCommandException) @@ -11,7 +12,7 @@ import re class TestInputCommand(unittest.TestCase): def test_parse_correct_raw_command(self): - self.assertEqual(InputCommand.parse('ssh --host 192.168.0.3').get_trigger(), 'ssh') + self.assertEqual(InputCommand.parse('ssh --host 192.168.0.3').trigger, 'ssh') def test_parse_raw_command_without_flag_name_with_value(self): with self.assertRaises(UnprocessedInputFlagException): @@ -27,35 +28,35 @@ class TestInputCommand(unittest.TestCase): def test_validate_valid_input_flag1(self): command = Command('some', flags=Flag('test')) - self.assertEqual(command.validate_input_flag(InputFlag('test')), 'Valid') + self.assertEqual(command.validate_input_flag(InputFlag('test', input_value=None, status=None)), ValidationStatus.VALID) def test_validate_valid_input_flag2(self): - command = Command('some', flags=Flags(Flag('test'), Flag('more'))) - self.assertEqual(command.validate_input_flag(InputFlag('more')), 'Valid') + command = Command('some', flags=Flags([Flag('test'), Flag('more')])) + self.assertEqual(command.validate_input_flag(InputFlag('more', input_value=None, status=None)), ValidationStatus.VALID) def test_validate_undefined_input_flag1(self): command = Command('some', flags=Flag('test')) - self.assertEqual(command.validate_input_flag(InputFlag('more')), 'Undefined') + self.assertEqual(command.validate_input_flag(InputFlag('more', input_value=None, status=None)), ValidationStatus.UNDEFINED) def test_validate_undefined_input_flag2(self): - command = Command('some', flags=Flags(Flag('test'), Flag('more'))) - self.assertEqual(command.validate_input_flag(InputFlag('case')), 'Undefined') + command = Command('some', flags=Flags([Flag('test'), Flag('more')])) + self.assertEqual(command.validate_input_flag(InputFlag('case', input_value=None, status=None)), ValidationStatus.UNDEFINED) def test_validate_undefined_input_flag3(self): command = Command('some') - self.assertEqual(command.validate_input_flag(InputFlag('case')), 'Undefined') + self.assertEqual(command.validate_input_flag(InputFlag('case', input_value=None, status=None)), ValidationStatus.UNDEFINED) def test_invalid_input_flag1(self): - command = Command('some', flags=Flag('test', possible_values=False)) - self.assertEqual(command.validate_input_flag(InputFlag('test', value='example')), 'Invalid') + command = Command('some', flags=Flag('test', possible_values=PossibleValues.NEITHER)) + self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID) def test_invalid_input_flag2(self): command = Command('some', flags=Flag('test', possible_values=['some', 'case'])) - self.assertEqual(command.validate_input_flag(InputFlag('test', value='slay')), 'Invalid') + self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='slay', status=None)), ValidationStatus.INVALID) def test_invalid_input_flag3(self): command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{, 2}op$'))) - self.assertEqual(command.validate_input_flag(InputFlag('test', value='example')), 'Invalid') + self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID) def test_isinstance_parse_correct_raw_command(self): cmd = InputCommand.parse('ssh --host 192.168.0.3') diff --git a/tests/unit_tests/test_dividing_line.py b/tests/unit_tests/test_dividing_line.py index cb58f72..edd353d 100644 --- a/tests/unit_tests/test_dividing_line.py +++ b/tests/unit_tests/test_dividing_line.py @@ -6,11 +6,11 @@ import unittest class TestDividingLine(unittest.TestCase): def test_get_static_dividing_line_full_line(self): line = StaticDividingLine('-') - self.assertEqual(line.get_full_static_line(True).count('-'), 25) + self.assertEqual(line.get_full_static_line(is_override=True).count('-'), 25) def test_get_dynamic_dividing_line_full_line(self): line = DynamicDividingLine() - self.assertEqual(line.get_full_dynamic_line(20, True).count('-'), 20) + self.assertEqual(line.get_full_dynamic_line(length=20, is_override=True).count('-'), 20) def test_get_dividing_line_unit_part(self): line = StaticDividingLine('') diff --git a/tests/unit_tests/test_flag.py b/tests/unit_tests/test_flag.py index 6cb1dfd..5203201 100644 --- a/tests/unit_tests/test_flag.py +++ b/tests/unit_tests/test_flag.py @@ -7,35 +7,34 @@ import re class TestFlag(unittest.TestCase): def test_get_string_entity(self): - self.assertEqual(Flag(name='test').get_string_entity(), + self.assertEqual(Flag(name='test').string_entity, '--test') def test_get_string_entity2(self): self.assertEqual(Flag(name='test', - prefix='---').get_string_entity(), + prefix='---').string_entity, '---test') def test_get_flag_name(self): - self.assertEqual(Flag(name='test').get_name(), + self.assertEqual(Flag(name='test').name, 'test') def test_get_flag_prefix(self): - self.assertEqual(Flag(name='test').get_prefix(), + self.assertEqual(Flag(name='test').prefix, '--') def test_get_flag_prefix2(self): self.assertEqual(Flag(name='test', - prefix='--').get_prefix(), + prefix='--').prefix, '--') def test_get_flag_value_without_set(self): - self.assertEqual(InputFlag(name='test').get_value(), + self.assertEqual(InputFlag(name='test', input_value=None, status=None).input_value, None) def test_get_flag_value_with_set(self): - flag = InputFlag(name='test') - flag.set_value('example') - self.assertEqual(flag.get_value(), 'example') + flag = InputFlag(name='test', input_value='example', status=None) + self.assertEqual(flag.input_value, 'example') def test_validate_incorrect_flag_value_with_list_of_possible_flag_values(self): flag = Flag(name='test', possible_values=['1', '2', '3']) @@ -54,15 +53,15 @@ class TestFlag(unittest.TestCase): self.assertEqual(flag.validate_input_flag_value('192.168.9.8'), True) def test_validate_correct_empty_flag_value_without_possible_flag_values(self): - flag = Flag(name='test', possible_values=PossibleValues.DISABLE) + flag = Flag(name='test', possible_values=PossibleValues.NEITHER) self.assertEqual(flag.validate_input_flag_value(None), True) def test_validate_correct_empty_flag_value_with_possible_flag_values(self): - flag = Flag(name='test', possible_values=PossibleValues.DISABLE) + flag = Flag(name='test', possible_values=PossibleValues.NEITHER) self.assertEqual(flag.validate_input_flag_value(None), True) def test_validate_incorrect_random_flag_value_without_possible_flag_values(self): - flag = Flag(name='test', possible_values=PossibleValues.DISABLE) + flag = Flag(name='test', possible_values=PossibleValues.NEITHER) self.assertEqual(flag.validate_input_flag_value('random value'), False) def test_validate_correct_random_flag_value_with_possible_flag_values(self): @@ -70,21 +69,21 @@ class TestFlag(unittest.TestCase): self.assertEqual(flag.validate_input_flag_value('random value'), True) def test_get_input_flag1(self): - flag = InputFlag(name='test') - input_flags = InputFlags(flag) - self.assertEqual(input_flags.get_flag('test'), flag) + flag = InputFlag(name='test', input_value=None, status=None) + input_flags = InputFlags([flag]) + self.assertEqual(input_flags.get_flag_by_name('test'), flag) def test_get_input_flag2(self): - flag = InputFlag(name='test') - flag2 = InputFlag(name='some') - input_flags = InputFlags(flag, flag2) - self.assertEqual(input_flags.get_flag('some'), flag2) + flag = InputFlag(name='test', input_value=None, status=None) + flag2 = InputFlag(name='some', input_value=None, status=None) + input_flags = InputFlags([flag, flag2]) + self.assertEqual(input_flags.get_flag_by_name('some'), flag2) def test_get_undefined_input_flag(self): - flag = InputFlag(name='test') - flag2 = InputFlag(name='some') - input_flags = InputFlags(flag, flag2) - self.assertEqual(input_flags.get_flag('case'), None) + flag = InputFlag(name='test', input_value=None, status=None) + flag2 = InputFlag(name='some', input_value=None, status=None) + input_flags = InputFlags([flag, flag2]) + self.assertEqual(input_flags.get_flag_by_name('case'), None) def test_get_flags(self): flags = Flags() @@ -94,18 +93,18 @@ class TestFlag(unittest.TestCase): Flag('test3'), ] flags.add_flags(list_of_flags) - self.assertEqual(flags.get_flags(), + self.assertEqual(flags.flags, list_of_flags) def test_add_flag(self): flags = Flags() flags.add_flag(Flag('test')) - self.assertEqual(len(flags.get_flags()), 1) + self.assertEqual(len(flags.flags), 1) def test_add_flags(self): flags = Flags() flags.add_flags([Flag('test'), Flag('test2')]) - self.assertEqual(len(flags.get_flags()), 2) + self.assertEqual(len(flags.flags), 2) diff --git a/tests/unit_tests/test_router.py b/tests/unit_tests/test_router.py index 56d879f..4417812 100644 --- a/tests/unit_tests/test_router.py +++ b/tests/unit_tests/test_router.py @@ -1,7 +1,10 @@ from argenta.command.flag import InputFlag, Flag -from argenta.command.flag.flags import Flags, InputFlags, UndefinedInputFlags, InvalidValueInputFlags, ValidInputFlags +from argenta.command.flag.flags import Flags, InputFlags +from argenta.command.flag.models import PossibleValues, ValidationStatus +from argenta.response.entity import Response from argenta.router import Router from argenta.command import Command +from argenta.router.entity import _structuring_input_flags, _validate_command, _validate_func_args # pyright: ignore[reportPrivateUsage] from argenta.router.exceptions import (TriggerContainSpacesException, RepeatedFlagNameException, TooManyTransferredArgsException, @@ -13,106 +16,98 @@ import re class TestRouter(unittest.TestCase): def test_register_command_with_spaces_in_trigger(self): - router = Router() with self.assertRaises(TriggerContainSpacesException): - router._validate_command(Command(trigger='command with spaces')) + _validate_command(Command(trigger='command with spaces')) def test_register_command_with_repeated_flags(self): - router = Router() with self.assertRaises(RepeatedFlagNameException): - router._validate_command(Command(trigger='command', flags=Flags(Flag('test'), Flag('test')))) + _validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')]))) def test_structuring_input_flags1(self): - router = Router() cmd = Command('cmd') - input_flags = InputFlags(InputFlag('ssh')) - self.assertEqual(router._structuring_input_flags(cmd, input_flags).undefined_flags, UndefinedInputFlags(InputFlag('ssh'))) + input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)]) + self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.UNDEFINED)])) def test_structuring_input_flags2(self): - router = Router() cmd = Command('cmd') - input_flags = InputFlags(InputFlag('ssh', value='some')) - self.assertEqual(router._structuring_input_flags(cmd, input_flags).undefined_flags, UndefinedInputFlags(InputFlag('ssh', value='some'))) + input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)]) + self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)])) def test_structuring_input_flags3(self): - router = Router() cmd = Command('cmd', flags=Flag('port')) - input_flags = InputFlags(InputFlag('ssh', value='some2')) - self.assertEqual(router._structuring_input_flags(cmd, input_flags).undefined_flags, UndefinedInputFlags(InputFlag('ssh', value='some2'))) + input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)]) + self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)])) def test_structuring_input_flags4(self): - router = Router() - command = Command('cmd', flags=Flag('ssh', possible_values=False)) - input_flags = InputFlags(InputFlag('ssh', value='some3')) - self.assertEqual(router._structuring_input_flags(command, input_flags).invalid_value_flags, InvalidValueInputFlags(InputFlag('ssh', value='some3'))) + command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER)) + input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)])) def test_structuring_input_flags5(self): - router = Router() command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$'))) - input_flags = InputFlags(InputFlag('ssh', value='some40')) - self.assertEqual(router._structuring_input_flags(command, input_flags).invalid_value_flags, InvalidValueInputFlags(InputFlag('ssh', value='some40'))) + input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)])) def test_structuring_input_flags6(self): - router = Router() command = Command('cmd', flags=Flag('ssh', possible_values=['example'])) - input_flags = InputFlags(InputFlag('ssh', value='example2')) - self.assertEqual(router._structuring_input_flags(command, input_flags).invalid_value_flags, InvalidValueInputFlags(InputFlag('ssh', value='example2'))) + input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)])) def test_structuring_input_flags7(self): command = Command('cmd', flags=Flag('port')) - input_flags = InputFlags(InputFlag('port', value='some2')) - self.assertEqual(Router()._structuring_input_flags(command, input_flags).valid_flags, ValidInputFlags(InputFlag('port', value='some2'))) + input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])) def test_structuring_input_flags8(self): command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3'])) - input_flags = InputFlags(InputFlag('port', value='some2')) - self.assertEqual(Router()._structuring_input_flags(command, input_flags).valid_flags, ValidInputFlags(InputFlag('port', value='some2'))) + input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])) def test_structuring_input_flags9(self): command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$'))) - input_flags = InputFlags(InputFlag('ssh', value='more5')) - self.assertEqual(Router()._structuring_input_flags(command, input_flags).valid_flags, ValidInputFlags(InputFlag('ssh', value='more5'))) + input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)])) def test_structuring_input_flags10(self): - command = Command('cmd', flags=Flag('ssh', possible_values=False)) - input_flags = InputFlags(InputFlag('ssh')) - self.assertEqual(Router()._structuring_input_flags(command, input_flags).valid_flags, ValidInputFlags(InputFlag('ssh'))) + command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER)) + input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)]) + self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.VALID)])) def test_validate_incorrect_func_args1(self): def handler(): pass with self.assertRaises(RequiredArgumentNotPassedException): - Router()._validate_func_args(handler) + _validate_func_args(handler) # pyright: ignore[reportArgumentType] def test_validate_incorrect_func_args2(self): - def handler(args, kwargs): + def handler(args, kwargs): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType] pass with self.assertRaises(TooManyTransferredArgsException): - Router()._validate_func_args(handler) + _validate_func_args(handler) # pyright: ignore[reportArgumentType] def test_get_router_aliases(self): router = Router() @router.command(Command('some', aliases=['test', 'case'])) - def handler(response): + def handler(response: Response) -> None: # pyright: ignore[reportUnusedFunction] pass - self.assertListEqual(router.get_aliases(), ['test', 'case']) + self.assertListEqual(router.aliases, ['test', 'case']) def test_get_router_aliases2(self): router = Router() @router.command(Command('some', aliases=['test', 'case'])) - def handler(response): + def handler(response: Response): # pyright: ignore[reportUnusedFunction] pass @router.command(Command('ext', aliases=['more', 'foo'])) - def handler2(response): + def handler2(response: Response): # pyright: ignore[reportUnusedFunction] pass - self.assertListEqual(router.get_aliases(), ['test', 'case', 'more', 'foo']) + self.assertListEqual(router.aliases, ['test', 'case', 'more', 'foo']) def test_get_router_aliases3(self): router = Router() @router.command(Command('some')) - def handler(response): + def handler(response: Response): # pyright: ignore[reportUnusedFunction] pass - self.assertListEqual(router.get_aliases(), []) + self.assertListEqual(router.aliases, [])