Merge pull request #6 from koloideal/better-tests

100% codebase test coverage
This commit is contained in:
kolo
2025-12-07 01:55:04 +03:00
committed by GitHub
31 changed files with 2749 additions and 1300 deletions
+2
View File
@@ -318,3 +318,5 @@ http-client.private.env.json
# Apifox Helper cache
.idea/.cache/.Apifox_Helper
.idea/ApifoxUploaderProjectSetting.xml
.zed
+1 -1
View File
@@ -30,7 +30,7 @@ App
system_router_title: str | None = "System points:",
ignore_command_register: bool = True,
dividing_line: AVAILABLE_DIVIDING_LINES = DEFAULT_DIVIDING_LINE,
repeat_command_groups_printing: bool = True,
repeat_command_groups_printing: bool = False,
override_system_messages: bool = False,
autocompleter: AutoCompleter = DEFAULT_AUTOCOMPLETER,
print_func: Printer = DEFAULT_PRINT_FUNC) -> None
+4
View File
@@ -13,6 +13,10 @@ tests:
tests-cov:
python -m pytest --cov=argenta tests
# Запустить тесты с отчетом о покрытии с html репортом
tests-cov-html:
python -m pytest --cov=argenta tests --cov-report=html
# Отформатировать код (Ruff + isort)
format:
python -m ruff format ./src
+19 -2
View File
@@ -1,2 +1,19 @@
import sys
print(sys.version_info >= (3, 13))
from argenta import App, DataBridge, Response, Router
from argenta.di import FromDishka
from argenta.di.integration import setup_dishka, _auto_inject_handlers
from argenta.di.providers import SystemProvider
from dishka import make_container
container = make_container()
Response.patch_by_container(container)
app = App()
router = Router()
@router.command('command')
def handler(res: Response, data_bridge: FromDishka[DataBridge]):
print(data_bridge)
_auto_inject_handlers(app)
_auto_inject_handlers(app)
+13
View File
@@ -50,6 +50,19 @@ line-length=90
[tool.pyright]
typeCheckingMode = "strict"
[[tool.pyright.executionEnvironments]]
root = "tests/"
reportPrivateUsage = false
reportUnusedFunction = false
[tool.coverage.run]
branch = true
omit = [
"src/argenta/app/protocols.py",
"src/argenta/*/exceptions.py",
"src/argenta/metrics/*"
]
[tool.mypy]
disable_error_code = "import-untyped"
+29 -39
View File
@@ -19,7 +19,6 @@ from argenta.app.protocols import (
)
from argenta.app.registered_routers.entity import RegisteredRouters
from argenta.command.exceptions import (
EmptyInputCommandException,
InputCommandException,
RepeatedInputFlagsException,
UnprocessedInputFlagException,
@@ -40,7 +39,7 @@ class BaseApp:
initial_message: str,
farewell_message: str,
exit_command: Command,
system_router_title: str | None,
system_router_title: str,
ignore_command_register: bool,
dividing_line: StaticDividingLine | DynamicDividingLine,
repeat_command_groups_printing: bool,
@@ -51,7 +50,7 @@ class BaseApp:
self._prompt: str = prompt
self._print_func: Printer = print_func
self._exit_command: Command = exit_command
self._system_router_title: str | None = system_router_title
self._system_router_title: str = system_router_title
self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line
self._ignore_command_register: bool = ignore_command_register
self._repeat_command_groups_printing_description: bool = repeat_command_groups_printing
@@ -144,7 +143,6 @@ class BaseApp:
:return: None
"""
for registered_router in self.registered_routers:
if registered_router.title:
self._print_func(registered_router.title)
for command_handler in registered_router.command_handlers:
handled_command = command_handler.handled_command
@@ -239,7 +237,7 @@ class BaseApp:
self._incorrect_input_syntax_handler(raw_command)
elif isinstance(error, RepeatedInputFlagsException):
self._repeated_input_flags_handler(raw_command)
elif isinstance(error, EmptyInputCommandException):
else:
self._empty_input_command_handler()
def _setup_system_router(self) -> None:
@@ -253,7 +251,6 @@ class BaseApp:
def _(response: Response) -> None:
self._exit_command_handler(response)
if system_router not in self.registered_routers.registered_routers:
system_router.command_register_ignore = self._ignore_command_register
self.registered_routers.add_registered_router(system_router)
@@ -323,7 +320,7 @@ class BaseApp:
for router_entity in self.registered_routers:
router_triggers = router_entity.triggers
router_aliases = router_entity.aliases
combined = router_triggers + router_aliases
combined = router_triggers | router_aliases
for trigger in combined:
self._matching_default_triggers_with_routers[trigger] = router_entity
@@ -331,15 +328,6 @@ class BaseApp:
self._autocompleter.initial_setup(list(self._current_matching_triggers_with_routers.keys()))
seen = {}
for item in list(self._current_matching_triggers_with_routers.keys()):
if item in seen:
Console().print(
f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]"
)
else:
seen[item] = True
if not self._override_system_messages:
self._setup_default_view()
@@ -352,6 +340,28 @@ class BaseApp:
if not self._repeat_command_groups_printing_description:
self._print_command_group_description()
def _process_exist_and_valid_command(self, input_command: InputCommand) -> None:
processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()]
if processing_router.disable_redirect_stdout:
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
)
else:
with redirect_stdout(io.StringIO()) as stdout:
processing_router.finds_appropriate_handler(input_command)
stdout_result: str = stdout.getvalue()
self._print_framed_text(stdout_result)
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine
DEFAULT_DIVIDING_LINE: StaticDividingLine = StaticDividingLine()
@@ -369,10 +379,10 @@ class App(BaseApp):
initial_message: str = "Argenta\n",
farewell_message: str = "\nSee you\n",
exit_command: Command = DEFAULT_EXIT_COMMAND,
system_router_title: str | None = "System points:",
system_router_title: str = "System points:",
ignore_command_register: bool = True,
dividing_line: AVAILABLE_DIVIDING_LINES = DEFAULT_DIVIDING_LINE,
repeat_command_groups_printing: bool = True,
repeat_command_groups_printing: bool = False,
override_system_messages: bool = False,
autocompleter: AutoCompleter = DEFAULT_AUTOCOMPLETER,
print_func: Printer = DEFAULT_PRINT_FUNC,
@@ -442,27 +452,7 @@ class App(BaseApp):
self._print_framed_text(stdout_res)
continue
processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()]
if processing_router.disable_redirect_stdout:
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
)
else:
with redirect_stdout(io.StringIO()) as stdout:
processing_router.finds_appropriate_handler(input_command)
stdout_result: str = stdout.getvalue()
if stdout_result:
self._print_framed_text(stdout_result)
self._process_exist_and_valid_command(input_command)
def include_router(self, router: Router) -> None:
"""
+4 -3
View File
@@ -2,11 +2,12 @@ __all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "Desc
from typing import Protocol, TypeVar
T = TypeVar("T", contravariant=True) # noqa: WPS111
class NonStandardBehaviorHandler(Protocol[T]):
def __call__(self, __param: T) -> None:
def __call__(self, _param: T, /) -> None:
raise NotImplementedError
@@ -16,10 +17,10 @@ class EmptyCommandHandler(Protocol):
class Printer(Protocol):
def __call__(self, __text: str) -> None:
def __call__(self, _text: str, /) -> None:
raise NotImplementedError
class DescriptionMessageGenerator(Protocol):
def __call__(self, __first_param: str, __second_param: str) -> str:
def __call__(self, _command: str, _description: str, /) -> str:
raise NotImplementedError
@@ -24,6 +24,3 @@ class RegisteredRouters:
def __iter__(self) -> Iterator[Router]:
return iter(self.registered_routers)
def __next__(self) -> Router:
return next(iter(self.registered_routers))
+2 -5
View File
@@ -39,9 +39,6 @@ class BaseFlags(Generic[FlagType]):
def __iter__(self) -> Iterator[FlagType]:
return iter(self.flags)
def __next__(self) -> FlagType:
return next(iter(self))
def __getitem__(self, flag_index: int) -> FlagType:
return self.flags[flag_index]
@@ -61,7 +58,7 @@ class Flags(BaseFlags[Flag]):
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, Flags):
return NotImplemented
return False
if len(self.flags) != len(other.flags):
return False
@@ -91,7 +88,7 @@ class InputFlags(BaseFlags[InputFlag]):
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, InputFlags):
raise NotImplementedError
return False
if len(self.flags) != len(other.flags):
return False
+3 -7
View File
@@ -44,20 +44,16 @@ class Flag:
:param input_flag_value: The input flag value to validate
:return: whether the entered flag is valid as bool
"""
if isinstance(self.possible_values, PossibleValues):
if self.possible_values == PossibleValues.NEITHER:
return input_flag_value == ''
if self.possible_values == PossibleValues.ALL:
return input_flag_value != ''
if isinstance(self.possible_values, Pattern):
return bool(self.possible_values.match(input_flag_value))
if isinstance(self.possible_values, list):
return input_flag_value in self.possible_values
return False
@property
def string_entity(self) -> str:
"""
@@ -88,9 +84,9 @@ class InputFlag:
self,
name: str,
*,
prefix: PREFIX_TYPE = "--",
input_value: str,
status: ValidationStatus | None,
prefix: PREFIX_TYPE = "--",
status: ValidationStatus | None = None,
):
"""
Public. The entity of the flag of the entered command
+3 -6
View File
@@ -17,7 +17,7 @@ ParseResult = tuple[str, InputFlags]
MIN_FLAG_PREFIX: str = "-"
PREFIX_TYPE = Literal["-", "--", "---"]
DEFAULT_WITHOUT_FLAGS: Flags = Flags()
DEFAULT_WITHOUT_ALIASES: list[Never] = []
DEFAULT_WITHOUT_ALIASES: set[Never] = set()
DEFAULT_WITHOUT_INPUT_FLAGS: InputFlags = InputFlags()
@@ -29,7 +29,7 @@ class Command:
*,
description: str = "Some useful command",
flags: Flag | Flags = DEFAULT_WITHOUT_FLAGS,
aliases: list[str] | list[Never] = DEFAULT_WITHOUT_ALIASES,
aliases: set[str] | set[Never] = DEFAULT_WITHOUT_ALIASES,
):
"""
Public. The command that can and should be registered in the Router
@@ -41,7 +41,7 @@ class Command:
self.registered_flags: Flags = flags if isinstance(flags, Flags) else Flags([flags])
self.trigger: str = trigger
self.description: str = description
self.aliases: list[str] | list[Never] = aliases
self.aliases: set[str] | set[Never] = aliases
def validate_input_flag(self, flag: InputFlag) -> ValidationStatus:
"""
@@ -104,9 +104,6 @@ class InputCommand:
else:
raise UnprocessedInputFlagException
if not name:
raise UnprocessedInputFlagException
if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
input_value = tokens[i + 1]
i += 2
+3 -3
View File
@@ -20,16 +20,16 @@ def inject(func: Callable[..., T]) -> Callable[..., T]:
def setup_dishka(app: App, container: Container, *, auto_inject: bool = False) -> None:
Response.patch_by_container(container)
if auto_inject:
_auto_inject_handlers(app)
Response.patch_by_container(container)
def _get_container_from_response(args: tuple[Any, ...], kwargs: dict[str, Any]) -> Container:
for arg in args:
if isinstance(arg, Response):
if hasattr(arg, "_dishka_container"):
return arg._dishka_container # pyright: ignore[reportPrivateUsage]
if hasattr(arg, "__dishka_container__"):
return arg.__dishka_container__ # pyright: ignore[reportPrivateUsage]
break
raise RuntimeError("dishka container not found in Response")
+1 -1
View File
@@ -94,7 +94,7 @@ class ArgParser:
namespace=self._core.parse_args(), processed_args=self.processed_args
)
def _register_args(self, processed_args: list[ValueArgument | BooleanArgument]) -> None:
def _register_args(self, processed_args: list[ValueArgument | BooleanArgument]) -> None: # pragma: no cover
if sys.version_info >= (3, 13):
for arg in processed_args:
if isinstance(arg, BooleanArgument):
+1 -1
View File
@@ -26,7 +26,7 @@ class Orchestrator:
self._custom_providers: list[Provider] = custom_providers
self._auto_inject_handlers: bool = auto_inject_handlers
self._arg_parser._parse_args()
self._arg_parser._parse_args() # pyright: ignore[reportPrivateUsage]
def start_polling(self, app: App) -> None:
"""
+2 -2
View File
@@ -9,7 +9,7 @@ EMPTY_INPUT_FLAGS: InputFlags = InputFlags()
class Response:
_dishka_container: Container
__dishka_container__: Container
def __init__(
self,
@@ -26,4 +26,4 @@ class Response:
@classmethod
def patch_by_container(cls, container: Container) -> None:
cls._dishka_container = container
cls.__dishka_container__ = container
@@ -44,6 +44,3 @@ class CommandHandlers:
def __iter__(self) -> Iterator[CommandHandler]:
return iter(self.command_handlers)
def __next__(self) -> CommandHandler:
return next(iter(self.command_handlers))
+26 -51
View File
@@ -22,7 +22,7 @@ HandlerFunc: TypeAlias = Callable[..., None]
class Router:
def __init__(
self,
title: str | None = "Default title",
title: str = "Default title",
*,
disable_redirect_stdout: bool = False,
):
@@ -36,12 +36,15 @@ class Router:
which is ambiguous behavior and can lead to unexpected work
:return: None
"""
self.title: str | None = title
self.title: str = title
self.disable_redirect_stdout: bool = disable_redirect_stdout
self.command_handlers: CommandHandlers = CommandHandlers()
self.command_register_ignore: bool = False
self.aliases: set[str] = set()
self.triggers: set[str] = set()
def command(self, command: Command | str) -> Callable[[HandlerFunc], HandlerFunc]:
"""
Public. Registers handler
@@ -53,7 +56,13 @@ class Router:
else:
redefined_command = command
_validate_command(redefined_command)
self._validate_command(redefined_command)
if overlapping := (self.aliases | self.triggers) & redefined_command.aliases:
Console().print(f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{overlapping}[/b blue]")
self.aliases.update(redefined_command.aliases)
self.triggers.add(redefined_command.trigger)
def decorator(func: HandlerFunc) -> HandlerFunc:
_validate_func_args(func)
@@ -62,6 +71,20 @@ class Router:
return decorator
def _validate_command(self, command: Command) -> None:
"""
Private. Validates the command registered in handler
:param command: validated command
:return: None if command is valid else raise exception
"""
command_name: str = command.trigger
if command_name.find(" ") != -1:
raise TriggerContainSpacesException()
flags: Flags = command.registered_flags
flags_name: list[str] = [flag.string_entity.lower() for flag in flags]
if len(set(flags_name)) < len(flags_name):
raise RepeatedFlagNameException()
def finds_appropriate_handler(self, input_command: InputCommand) -> None:
"""
Private. Finds the appropriate handler for given input command and passes control to it
@@ -105,40 +128,6 @@ class Router:
response = Response(ResponseStatus.ALL_FLAGS_VALID)
command_handler.handling(response)
@property
def triggers(self) -> list[str]:
"""
Public. Gets registered triggers
:return: registered in router triggers as list[str]
"""
all_triggers: list[str] = []
for command_handler in self.command_handlers:
all_triggers.append(command_handler.handled_command.trigger)
return all_triggers
@property
def aliases(self) -> list[str]:
"""
Public. Gets registered aliases
:return: registered in router aliases as list[str]
"""
all_aliases: list[str] = []
for command_handler in self.command_handlers:
if command_handler.handled_command.aliases:
all_aliases.extend(command_handler.handled_command.aliases)
return all_aliases
class CommandDecorator:
def __init__(self, router_instance: Router, command: Command):
self.router: Router = router_instance
self.command: Command = command
def __call__(self, handler_func: Callable[..., None]) -> Callable[..., None]:
_validate_func_args(handler_func)
self.router.command_handlers.add_handler(CommandHandler(handler_func, self.command))
return handler_func
def _structuring_input_flags(handled_command: Command, input_flags: InputFlags) -> Response:
"""
@@ -189,17 +178,3 @@ def _validate_func_args(func: Callable[..., None]) -> None:
highlight=False,
)
def _validate_command(command: Command) -> None:
"""
Private. Validates the command registered in handler
:param command: validated command
:return: None if command is valid else raise exception
"""
command_name: str = command.trigger
if command_name.find(" ") != -1:
raise TriggerContainSpacesException()
flags: Flags = command.registered_flags
flags_name: list[str] = [flag.string_entity.lower() for flag in flags]
if len(set(flags_name)) < len(flags_name):
raise RepeatedFlagNameException()
+1 -1
View File
@@ -20,7 +20,7 @@ class RequiredArgumentNotPassedException(Exception):
@override
def __str__(self) -> str:
return "Required argument not passed"
return "Required argument with type Response not passed"
class TriggerContainSpacesException(Exception):
@@ -1,10 +1,8 @@
import io
import re
import sys
from unittest import TestCase
from unittest.mock import MagicMock, patch
from collections.abc import Iterator
import _io
import pytest
from argenta import App, Orchestrator, Router
from argenta.command import Command, PredefinedFlags
@@ -13,61 +11,143 @@ from argenta.command.flag.models import ValidationStatus
from argenta.response import Response
class PatchedArgvTestCase(TestCase):
def setUp(self):
super().setUp()
self.patcher = patch.object(sys, 'argv', ['program.py'])
self.mock_argv = self.patcher.start()
self.addCleanup(self.patcher.stop)
@pytest.fixture(autouse=True)
def patch_argv(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, 'argv', ['program.py'])
class TestSystemHandlerNormalWork(PatchedArgvTestCase):
@patch("builtins.input", side_effect=["help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def _mock_input(inputs: Iterator[str]) -> str:
return next(inputs)
# ============================================================================
# Tests for empty input handling
# ============================================================================
def test_empty_input_triggers_empty_command_handler(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
app.set_empty_command_handler(lambda: print('Empty input command'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn("\nUnknown command: help\n", output)
assert "\nEmpty input command\n" in output
@patch("builtins.input", side_effect=["TeSt", "Q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_incorrect_command2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
# ============================================================================
# Tests for unknown command handling
# ============================================================================
def test_unknown_command_triggers_unknown_command_handler(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["help", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(ignore_command_register=False,
override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nUnknown command: TeSt\n', output)
assert "\nUnknown command: help\n" in output
@patch("builtins.input", side_effect=["test --help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_case_sensitive_command_triggers_unknown_command_handler(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["TeSt", "Q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(ignore_command_register=False, override_system_messages=True, print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = capsys.readouterr().out
assert '\nUnknown command: TeSt\n' in output
def test_mixed_valid_and_unknown_commands_handled_correctly(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test", "some", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = capsys.readouterr().out
assert re.search(r'\ntest command\n(.|\n)*\nUnknown command: some', output)
def test_multiple_commands_with_unknown_command_in_between(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test", "some", "more", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
@router.command(Command('more'))
def test1(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('more command')
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = capsys.readouterr().out
assert re.search(r'\ntest command\n(.|\n)*\nUnknown command: some\n(.|\n)*\nmore command', output)
# ============================================================================
# Tests for unregistered flag handling
# ============================================================================
def test_unregistered_flag_without_value_is_accessible(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --help", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@@ -77,19 +157,19 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'test command with undefined flag: {undefined_flag.string_entity}')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\ntest command with undefined flag: --help\n', output)
assert '\ntest command with undefined flag: --help\n' in output
@patch("builtins.input", side_effect=["test --port 22", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_unregistered_flag_with_value_is_accessible(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --port 22", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@@ -101,19 +181,19 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
else:
raise
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\ntest command with undefined flag with value: --port 22\n', output)
assert '\ntest command with undefined flag with value: --port 22\n' in output
@patch("builtins.input", side_effect=["test --host 192.168.32.1 --port 132", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_one_correct_flag_an_one_incorrect_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_registered_and_unregistered_flags_coexist(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --host 192.168.32.1 --port 132", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flags = Flags([PredefinedFlags.HOST])
@@ -124,141 +204,62 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'connecting to host with flag: {undefined_flag.string_entity} {undefined_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nconnecting to host with flag: --port 132\n', output)
assert '\nconnecting to host with flag: --port 132\n' in output
@patch("builtins.input", side_effect=["test", "some", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_one_correct_command_and_one_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
# ============================================================================
# Tests for incorrect flag syntax handling
# ============================================================================
def test_flag_without_value_triggers_incorrect_syntax_handler(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test 535 --port", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertRegex(output, re.compile(r'\ntest command\n(.|\n)*\nUnknown command: some'))
@patch("builtins.input", side_effect=["test", "some", "more", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_two_correct_commands_and_one_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('more'))
def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'more command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertRegex(output, re.compile(r'\ntest command\n(.|\n)*\nUnknown command: some\n(.|\n)*\nmore command'))
@patch("builtins.input", side_effect=["test 535 --port", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_incorrect_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_incorrect_input_syntax_handler(lambda command: print(f'Incorrect flag syntax: "{command}"'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn("\nIncorrect flag syntax: \"test 535 --port\"\n", output)
assert "\nIncorrect flag syntax: \"test 535 --port\"\n" in output
@patch("builtins.input", side_effect=["", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_empty_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_empty_command_handler(lambda: print('Empty input command'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn("\nEmpty input command\n", output)
# ============================================================================
# Tests for repeated flag handling
# ============================================================================
@patch("builtins.input", side_effect=["test --port 22 --port 33", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_repeated_flags(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_repeated_flags_trigger_repeated_flags_handler(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --port 22 --port 33", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test', flags=PredefinedFlags.PORT))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
app.set_repeated_input_flags_handler(lambda command: print(f'Repeated input flags: "{command}"'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('Repeated input flags: "test --port 22 --port 33"', output)
@patch("builtins.input", side_effect=["test --help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag3(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
undefined_flag = response.input_flags.get_flag_by_name('help')
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'test command with undefined flag: {undefined_flag.string_entity}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn('\ntest command with undefined flag: --help\n', output)
assert 'Repeated input flags: "test --port 22 --port 33"' in output
@@ -1,10 +1,8 @@
import io
import re
import sys
from unittest import TestCase
from unittest.mock import MagicMock, patch
from collections.abc import Iterator
import _io
import pytest
from argenta import App, Orchestrator, Router
from argenta.command import Command, PredefinedFlags
@@ -14,59 +12,121 @@ from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.response import Response
class PatchedArgvTestCase(TestCase):
def setUp(self):
super().setUp()
self.patcher = patch.object(sys, 'argv', ['program.py'])
self.mock_argv = self.patcher.start()
self.addCleanup(self.patcher.stop)
@pytest.fixture(autouse=True)
def patch_argv(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, 'argv', ['program.py'])
class TestSystemHandlerNormalWork(PatchedArgvTestCase):
@patch("builtins.input", side_effect=["test", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def _mock_input(inputs: Iterator[str]) -> str:
return next(inputs)
# ============================================================================
# Tests for basic command execution
# ============================================================================
def test_simple_command_executes_successfully(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\ntest command\n', output)
assert '\ntest command\n' in output
@patch("builtins.input", side_effect=["TeSt", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_case_insensitive_command_executes_when_enabled(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["TeSt", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(ignore_command_register=True,
override_system_messages=True,
print_func=print)
app = App(ignore_command_register=True, override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\ntest command\n', output)
assert '\ntest command\n' in output
@patch("builtins.input", side_effect=["test --help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_custom_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_two_commands_execute_sequentially(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test", "some", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
@router.command(Command('some'))
def test2(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('some command')
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = capsys.readouterr().out
assert re.search(r'\ntest command\n(.|\n)*\nsome command\n', output)
def test_three_commands_execute_sequentially(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test", "some", "more", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
@router.command(Command('some'))
def test1(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('some command')
@router.command(Command('more'))
def test2(_response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('more command')
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = capsys.readouterr().out
assert re.search(r'\ntest command\n(.|\n)*\nsome command\n(.|\n)*\nmore command', output)
# ============================================================================
# Tests for custom flag handling
# ============================================================================
def test_custom_flag_without_value_is_recognized(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --help", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flag = Flag('help', prefix='--', possible_values=PossibleValues.NEITHER)
@@ -77,18 +137,19 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'\nhelp for {valid_flag.name} flag\n')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nhelp for help flag\n', output)
assert '\nhelp for help flag\n' in output
def test_custom_flag_with_regex_validation_accepts_valid_value(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --port 22", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
@patch("builtins.input", side_effect=["test --port 22", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_custom_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
flag = Flag('port', prefix='--', possible_values=re.compile(r'^\d{1,5}$'))
@@ -99,19 +160,24 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'flag value for {valid_flag.name} flag : {valid_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, repeat_command_groups_printing=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nflag value for port flag : 22\n', output)
assert '\nflag value for port flag : 22\n' in output
@patch("builtins.input", side_effect=["test -H", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
# ============================================================================
# Tests for predefined flag handling
# ============================================================================
def test_predefined_short_help_flag_is_recognized(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test -H", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.SHORT_HELP
@@ -122,19 +188,19 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'help for {valid_flag.name} flag')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nhelp for H flag\n', output)
assert '\nhelp for H flag\n' in output
@patch("builtins.input", side_effect=["test --info", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_predefined_info_flag_is_recognized(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --info", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.INFO
@@ -145,19 +211,19 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print('info about test command')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\ninfo about test command\n', output)
assert '\ninfo about test command\n' in output
@patch("builtins.input", side_effect=["test --host 192.168.0.1", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag3(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
def test_predefined_host_flag_with_value_is_recognized(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --host 192.168.0.1", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.HOST
@@ -168,19 +234,24 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'connecting to host {valid_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nconnecting to host 192.168.0.1\n', output)
assert '\nconnecting to host 192.168.0.1\n' in output
@patch("builtins.input", side_effect=["test --host 192.168.32.1 --port 132", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_two_flags(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
# ============================================================================
# Tests for multiple flag handling
# ============================================================================
def test_two_predefined_flags_are_recognized_together(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
inputs = iter(["test --host 192.168.32.1 --port 132", "q"])
monkeypatch.setattr('builtins.input', lambda _prompt="": _mock_input(inputs))
router = Router()
orchestrator = Orchestrator()
flags = Flags([PredefinedFlags.HOST, PredefinedFlags.PORT])
@@ -192,63 +263,10 @@ class TestSystemHandlerNormalWork(PatchedArgvTestCase):
if (host_flag and host_flag.status == ValidationStatus.VALID) and (port_flag and port_flag.status == ValidationStatus.VALID):
print(f'connecting to host {host_flag.input_value} and port {port_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app = App(override_system_messages=True, print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
output = capsys.readouterr().out
self.assertIn('\nconnecting to host 192.168.32.1 and port 132\n', output)
@patch("builtins.input", side_effect=["test", "some", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_two_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('some'))
def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'some command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertRegex(output, re.compile(r'\ntest command\n(.|\n)*\nsome command\n'))
@patch("builtins.input", side_effect=["test", "some", "more", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_three_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('some'))
def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'some command')
@router.command(Command('more'))
def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'more command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertRegex(output, re.compile(r'\ntest command\n(.|\n)*\nsome command\n(.|\n)*\nmore command'))
assert '\nconnecting to host 192.168.32.1 and port 132\n' in output
+592 -26
View File
@@ -1,74 +1,640 @@
import unittest
import pytest
from pytest import CaptureFixture
from argenta.app import App
from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine
from argenta.app.protocols import DescriptionMessageGenerator, NonStandardBehaviorHandler
from argenta.command.models import Command, InputCommand
from argenta.response import Response
from argenta.response.status import ResponseStatus
from argenta.router import Router
class MyTestCase(unittest.TestCase):
def test_is_exit_command1(self):
app = App()
self.assertEqual(app._is_exit_command(InputCommand('q')), True)
# ============================================================================
# Tests for exit command detection
# ============================================================================
def test_is_exit_command5(self):
app = App()
self.assertEqual(app._is_exit_command(InputCommand('Q')), True)
def test_is_exit_command2(self):
def test_default_exit_command_lowercase_q_is_recognized() -> None:
app = App()
assert app._is_exit_command(InputCommand('q')) is True
def test_default_exit_command_uppercase_q_is_recognized() -> None:
app = App()
assert app._is_exit_command(InputCommand('Q')) is True
def test_exit_command_not_recognized_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False)
self.assertEqual(app._is_exit_command(InputCommand('q')), False)
assert app._is_exit_command(InputCommand('q')) is False
def test_is_exit_command3(self):
def test_custom_exit_command_is_recognized() -> None:
app = App(exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('quit')), True)
assert app._is_exit_command(InputCommand('quit')) is True
def test_is_exit_command4(self):
def test_custom_exit_command_case_insensitive_by_default() -> None:
app = App(exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('qUIt')), True)
assert app._is_exit_command(InputCommand('qUIt')) is True
def test_is_exit_command6(self):
app = App(ignore_command_register=False,
exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('qUIt')), False)
def test_is_unknown_command1(self):
def test_custom_exit_command_case_sensitive_when_enabled() -> None:
app = App(ignore_command_register=False, exit_command=Command('quit'))
assert app._is_exit_command(InputCommand('qUIt')) is False
def test_exit_command_alias_is_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('exit')) is True
def test_exit_command_alias_case_sensitive_when_enabled() -> None:
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('exit')) is True
def test_non_exit_command_is_not_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('quit')) is False
def test_non_exit_command_with_wrong_case_is_not_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('Exit')) is False
# ============================================================================
# Tests for unknown command detection
# ============================================================================
def test_registered_command_is_not_unknown() -> None:
app = App()
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('fr')), False)
assert app._is_unknown_command(InputCommand('fr')) is False
def test_is_unknown_command2(self):
def test_unregistered_command_is_unknown() -> None:
app = App()
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('cr')), True)
assert app._is_unknown_command(InputCommand('cr')) is True
def test_is_unknown_command3(self):
def test_command_with_wrong_case_is_unknown_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('pr')), True)
assert app._is_unknown_command(InputCommand('pr')) is True
def test_is_unknown_command4(self):
def test_command_with_exact_case_is_not_unknown_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('tW')), False)
assert app._is_unknown_command(InputCommand('tW')) is False
# ============================================================================
# Tests for similar command suggestions
# ============================================================================
def test_most_similar_command_finds_closest_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('port', aliases={'p'}))
def handler(_res: Response) -> None:
pass
@router.command(Command('host', aliases={'h'}))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('por') == 'port'
def test_most_similar_command_prefers_shorter_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('com') == 'command'
def test_most_similar_command_finds_longer_match_when_closer() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('command_') == 'command_other'
def test_most_similar_command_returns_none_for_no_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('nonexists') is None
def test_most_similar_command_matches_aliases() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command', aliases={'other_name'}))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other', aliases={'more_name'}))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('othe') == 'other_name'
# ============================================================================
# Tests for router registration
# ============================================================================
def test_include_routers_registers_multiple_routers() -> None:
app = App()
router = Router()
router2 = Router()
app.include_routers(router, router2)
assert app.registered_routers.registered_routers == [router, router2]
def test_overlapping_aliases_prints_warning(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('test', aliases={'alias'}))
def handler(_res: Response) -> None:
pass
@router.command(Command('test2', aliases={'alias'}))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
captured = capsys.readouterr()
assert "Overlapping" in captured.out
# ============================================================================
# Tests for startup messages
# ============================================================================
def test_add_message_on_startup_stores_message() -> None:
app = App()
app.add_message_on_startup('Some message')
assert app._messages_on_startup == ['Some message']
def test_pre_cycle_setup_prints_startup_messages(capsys: CaptureFixture[str]) -> None:
app = App()
app.add_message_on_startup('some message')
app._pre_cycle_setup()
stdout = capsys.readouterr()
assert 'some message' in stdout.out
# ============================================================================
# Tests for framed text printing
# ============================================================================
def test_print_framed_text_with_static_dividing_line(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=StaticDividingLine(length=5))
app._print_framed_text('test')
captured = capsys.readouterr()
assert '\n-----\n\ntest\n\n-----\n' in captured.out
def test_print_framed_text_with_dynamic_dividing_line_short_text(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=DynamicDividingLine())
app._print_framed_text('some long test')
captured = capsys.readouterr()
assert '\n--------------\n\nsome long test\n\n--------------\n' in captured.out
def test_print_framed_text_with_dynamic_dividing_line_long_text(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=DynamicDividingLine())
app._print_framed_text('test as test as test')
captured = capsys.readouterr()
assert '\n' + '-'*20 + '\n\ntest as test as test\n\n' + '-'*20 + '\n' in captured.out
def test_print_framed_text_with_unsupported_dividing_line_raises_error() -> None:
class OtherDividingLine:
pass
app = App(override_system_messages=True, dividing_line=OtherDividingLine()) # pyright: ignore[reportArgumentType]
with pytest.raises(NotImplementedError):
app._print_framed_text('some long test')
# ============================================================================
# Tests for handler configuration
# ============================================================================
def test_set_description_message_pattern_stores_generator() -> None:
app = App()
descr_gen: DescriptionMessageGenerator = lambda command, description: command + '-+-' + description
app.set_description_message_pattern(descr_gen)
assert app._description_message_gen is descr_gen
def test_set_exit_command_handler_stores_handler() -> None:
app = App()
handler: NonStandardBehaviorHandler[Response] = lambda response: print('goodbye')
app.set_exit_command_handler(handler)
assert app._exit_command_handler is handler
# ============================================================================
# Tests for default view setup
# ============================================================================
def test_setup_default_view_formats_prompt() -> None:
app = App(prompt='>>')
app._setup_default_view()
assert app._prompt == '[italic dim bold]>>'
def test_setup_default_view_sets_default_unknown_command_handler() -> None:
app = App()
app._setup_default_view()
assert app._unknown_command_handler(InputCommand('nonexists')) is None
# ============================================================================
# Tests for command processing
# ============================================================================
def test_process_command_with_router_with_disabled_stdout_redirect(capsys: CaptureFixture[str]) -> None:
app = App(repeat_command_groups_printing=True)
router = Router(disable_redirect_stdout=True)
@router.command('command')
def handler(_res: Response) -> None:
print("Hello!")
app.include_router(router)
app._pre_cycle_setup()
app._process_exist_and_valid_command(InputCommand('command'))
stdout = capsys.readouterr()
assert 'Hello!' in stdout.out
# ============================================================================
# Tests for handler setters and execution
# ============================================================================
def test_set_unknown_command_handler_stores_handler() -> None:
app = App()
call_tracker = {'called': False}
def custom_handler(_command: InputCommand) -> None:
call_tracker['called'] = True
app.set_unknown_command_handler(custom_handler)
app._unknown_command_handler(InputCommand('test'))
assert call_tracker['called']
def test_set_exit_handler_stores_handler() -> None:
app = App()
call_tracker = {'called': False}
def custom_handler(_response: Response) -> None:
call_tracker['called'] = True
app.set_exit_command_handler(custom_handler)
app._exit_command_handler(Response(ResponseStatus.ALL_FLAGS_VALID))
assert call_tracker['called']
def test_set_empty_command_handler_stores_handler() -> None:
app = App()
call_tracker = {'called': False}
def custom_handler() -> None:
call_tracker['called'] = True
app.set_empty_command_handler(custom_handler)
app._empty_input_command_handler()
assert call_tracker['called']
def test_set_incorrect_input_syntax_handler_stores_handler() -> None:
app = App()
call_tracker = {'called': False}
def custom_handler(_command: str) -> None:
call_tracker['called'] = True
app.set_incorrect_input_syntax_handler(custom_handler)
app._incorrect_input_syntax_handler('test --flag')
assert call_tracker['called']
def test_set_repeated_input_flags_handler_stores_handler() -> None:
app = App()
call_tracker = {'called': False}
def custom_handler(_command: str) -> None:
call_tracker['called'] = True
app.set_repeated_input_flags_handler(custom_handler)
app._repeated_input_flags_handler('test --flag --flag')
assert call_tracker['called']
# ============================================================================
# Tests for handler execution with output
# ============================================================================
def test_unknown_command_handler_prints_custom_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
def custom_handler(command: InputCommand) -> None:
print(f'Command not found: {command.trigger}')
app.set_unknown_command_handler(custom_handler)
app._unknown_command_handler(InputCommand('unknown'))
output = capsys.readouterr()
assert 'Command not found: unknown' in output.out
def test_exit_command_handler_prints_custom_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
def custom_handler(_response: Response) -> None:
print('Goodbye!')
app.set_exit_command_handler(custom_handler)
app._exit_command_handler(Response(ResponseStatus.ALL_FLAGS_VALID))
output = capsys.readouterr()
assert 'Goodbye!' in output.out
def test_empty_command_handler_prints_custom_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
def custom_handler() -> None:
print('Please enter a command')
app.set_empty_command_handler(custom_handler)
app._empty_input_command_handler()
output = capsys.readouterr()
assert 'Please enter a command' in output.out
def test_incorrect_syntax_handler_prints_custom_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
def custom_handler(command: str) -> None:
print(f'Syntax error in: {command}')
app.set_incorrect_input_syntax_handler(custom_handler)
app._incorrect_input_syntax_handler('test --flag')
output = capsys.readouterr()
assert 'Syntax error in: test --flag' in output.out
def test_repeated_flags_handler_prints_custom_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
def custom_handler(command: str) -> None:
print(f'Duplicate flags in: {command}')
app.set_repeated_input_flags_handler(custom_handler)
app._repeated_input_flags_handler('test --flag --flag')
output = capsys.readouterr()
assert 'Duplicate flags in: test --flag --flag' in output.out
# ============================================================================
# Tests for default handler behavior
# ============================================================================
def test_default_unknown_command_handler_prints_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
app._unknown_command_handler(InputCommand('unknown'))
output = capsys.readouterr()
assert 'Unknown command: unknown' in output.out
def test_default_empty_command_handler_prints_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
app._empty_input_command_handler()
output = capsys.readouterr()
assert 'Empty input command' in output.out
def test_default_incorrect_syntax_handler_prints_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
app._incorrect_input_syntax_handler('test --flag')
output = capsys.readouterr()
assert 'Incorrect flag syntax: test --flag' in output.out
def test_default_repeated_flags_handler_prints_message(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
app._repeated_input_flags_handler('test --flag --flag')
output = capsys.readouterr()
assert 'Repeated input flags: test --flag --flag' in output.out
# ============================================================================
# Tests for handler chaining and multiple calls
# ============================================================================
def test_handler_can_be_replaced_multiple_times() -> None:
app = App()
call_tracker = {'count': 0}
def handler1(_command: InputCommand) -> None:
call_tracker['count'] += 1
def handler2(_command: InputCommand) -> None:
call_tracker['count'] += 10
app.set_unknown_command_handler(handler1)
app._unknown_command_handler(InputCommand('test'))
assert call_tracker['count'] == 1
app.set_unknown_command_handler(handler2)
app._unknown_command_handler(InputCommand('test'))
assert call_tracker['count'] == 11
def test_handler_receives_correct_parameters() -> None:
app = App()
received_data = {'trigger': None}
def custom_handler(command: InputCommand) -> None:
received_data['trigger'] = command.trigger
app.set_unknown_command_handler(custom_handler)
app._unknown_command_handler(InputCommand('mycommand'))
assert received_data['trigger'] == 'mycommand'
def test_exit_handler_receives_response_object() -> None:
app = App()
received_data = {'response': None}
def custom_handler(response: Response) -> None:
received_data['response'] = response
app.set_exit_command_handler(custom_handler)
test_response = Response(ResponseStatus.ALL_FLAGS_VALID)
app._exit_command_handler(test_response)
assert received_data['response'] is test_response
# ============================================================================
# Tests for handler integration with routers
# ============================================================================
def test_app_with_router_and_custom_unknown_handler(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('test'))
def handler(_res: Response) -> None:
print('test executed')
app.include_router(router)
def custom_unknown_handler(command: InputCommand) -> None:
print(f'Not found: {command.trigger}')
app.set_unknown_command_handler(custom_unknown_handler)
# Test that unknown command uses custom handler
assert app._is_unknown_command(InputCommand('unknown'))
app._unknown_command_handler(InputCommand('unknown'))
output = capsys.readouterr()
assert 'Not found: unknown' in output.out
def test_app_handlers_work_with_multiple_routers() -> None:
app = App(override_system_messages=True)
router1 = Router()
router2 = Router()
@router1.command(Command('cmd1'))
def handler1(_res: Response) -> None:
pass
@router2.command(Command('cmd2'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router1, router2)
app._pre_cycle_setup()
call_tracker = {'called': False}
def custom_handler(_command: InputCommand) -> None:
call_tracker['called'] = True
app.set_unknown_command_handler(custom_handler)
# Both commands should be known
assert not app._is_unknown_command(InputCommand('cmd1'))
assert not app._is_unknown_command(InputCommand('cmd2'))
# Unknown command should trigger handler
assert app._is_unknown_command(InputCommand('unknown'))
app._unknown_command_handler(InputCommand('unknown'))
assert call_tracker['called']
+181 -115
View File
@@ -1,22 +1,29 @@
from argparse import Namespace
import sys
from unittest.mock import call
from argparse import Namespace
from typing import TYPE_CHECKING
import pytest
from pytest_mock import MockerFixture
from argenta.orchestrator.argparser.arguments.models import (BaseArgument,
from argenta.orchestrator.argparser.arguments.models import (
BaseArgument,
BooleanArgument,
InputArgument,
ValueArgument)
ValueArgument,
)
from argenta.orchestrator.argparser.entity import ArgParser, ArgSpace
if TYPE_CHECKING:
from pytest_mock.plugin import MockType
class TestArgumentCreation:
"""Tests for the creation and attribute validation of argument model classes."""
def test_value_argument_creation(self):
"""Ensures ValueArgument instances are created with correct attributes."""
arg = ValueArgument(
# ============================================================================
# Tests for argument model creation
# ============================================================================
def test_value_argument_stores_all_properties() -> None:
arg: ValueArgument = ValueArgument(
name="test_arg",
prefix="--",
help="A test argument.",
@@ -35,9 +42,9 @@ class TestArgumentCreation:
assert arg.action == "store"
assert arg.string_entity == "--test_arg"
def test_boolean_argument_creation(self):
"""Ensures BooleanArgument instances are created with correct attributes."""
arg = BooleanArgument(
def test_boolean_argument_stores_all_properties() -> None:
arg: BooleanArgument = BooleanArgument(
name="verbose", prefix="-", help="Enable verbose mode.", is_deprecated=True
)
assert arg.name == "verbose"
@@ -47,9 +54,9 @@ class TestArgumentCreation:
assert arg.action == "store_true"
assert arg.string_entity == "-verbose"
def test_input_argument_creation(self):
"""Ensures InputArgument instances are created with correct attributes."""
arg = InputArgument(
def test_input_argument_stores_all_properties() -> None:
arg: InputArgument = InputArgument(
name="file", value="/path/to/file", founder_class=ValueArgument
)
assert arg.name == "file"
@@ -57,71 +64,90 @@ class TestArgumentCreation:
assert arg.founder_class is ValueArgument
class TestArgSpace:
"""Tests for the ArgSpace class, which holds parsed argument values."""
def test_input_argument_str_representation() -> None:
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert str(arg) == 'InputArgument(host=192.168.0.0)'
def test_input_argument_repr_representation() -> None:
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert repr(arg) == "InputArgument<name=host, value=192.168.0.0, founder_class=ValueArgument>"
# ============================================================================
# Fixtures for ArgSpace tests
# ============================================================================
@pytest.fixture
def mock_arguments(self) -> list[InputArgument]:
"""Provides a list of mock InputArgument objects for testing."""
def mock_arguments() -> list[InputArgument]:
return [
InputArgument(name="arg1", value="val1", founder_class=ValueArgument),
InputArgument(name="arg2", value=True, founder_class=BooleanArgument),
InputArgument(name="arg3", value="val3", founder_class=ValueArgument),
]
@pytest.fixture
def arg_space(self, mock_arguments: list[InputArgument]) -> ArgSpace:
"""Provides a pre-populated ArgSpace instance."""
def arg_space(mock_arguments: list[InputArgument]) -> ArgSpace:
return ArgSpace(all_arguments=mock_arguments)
def test_initialization(self, arg_space: ArgSpace, mock_arguments: list[InputArgument]):
"""Tests if ArgSpace is initialized correctly with a list of arguments."""
# ============================================================================
# Tests for ArgSpace initialization and basic operations
# ============================================================================
def test_argspace_initializes_with_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
assert len(arg_space.all_arguments) == 3
assert arg_space.all_arguments == mock_arguments
def test_get_by_name(self, arg_space: ArgSpace, mock_arguments: list[InputArgument]):
"""Tests retrieving an argument by its name."""
found_arg = arg_space.get_by_name("arg1")
def test_argspace_get_by_name_finds_existing_argument(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
found_arg: InputArgument | None = arg_space.get_by_name("arg1")
assert found_arg is not None
assert found_arg == mock_arguments[0]
def test_get_by_name_not_found(self, arg_space: ArgSpace):
"""Tests that get_by_name returns None for a non-existent argument."""
found_arg = arg_space.get_by_name("non_existent_arg")
def test_argspace_get_by_name_returns_none_for_missing_argument(arg_space: ArgSpace) -> None:
found_arg: InputArgument | None = arg_space.get_by_name("non_existent_arg")
assert found_arg is None
def test_get_by_type(self, arg_space: ArgSpace, mock_arguments: list[InputArgument]):
"""Tests retrieving arguments based on their founder class type."""
def test_argspace_get_by_type_filters_value_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
value_args = arg_space.get_by_type(ValueArgument)
assert len(value_args) == 2
assert mock_arguments[0] in value_args
assert mock_arguments[2] in value_args
def test_argspace_get_by_type_filters_boolean_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
bool_args = arg_space.get_by_type(BooleanArgument)
assert len(bool_args) == 1
assert mock_arguments[1] in bool_args
def test_get_by_type_not_found(self, arg_space: ArgSpace):
"""Tests that get_by_type returns an empty list for an unused argument type."""
def test_argspace_get_by_type_returns_empty_list_for_unknown_type(arg_space: ArgSpace) -> None:
class OtherArgument(BaseArgument):
pass
other_args = arg_space.get_by_type(OtherArgument)
other_args = arg_space.get_by_type(OtherArgument) # pyright: ignore[reportAssignmentType]
assert other_args == []
def test_from_namespace(self):
"""Tests the class method for creating an ArgSpace from an argparse.Namespace."""
namespace = Namespace(config="config.json", debug=True, verbose=False)
processed_args = [
def test_argspace_from_namespace_creates_argspace_from_parsed_namespace() -> None:
namespace: Namespace = Namespace(config="config.json", debug=True, verbose=False)
processed_args: list[ValueArgument | BooleanArgument] = [
ValueArgument(name="config", prefix="--"),
BooleanArgument(name="debug", prefix="-"),
BooleanArgument(name="verbose", prefix="-"),
]
arg_space = ArgSpace.from_namespace(namespace, processed_args)
arg_space: ArgSpace = ArgSpace.from_namespace(namespace, processed_args)
assert len(arg_space.all_arguments) == 3
config_arg = arg_space.get_by_name('config')
debug_arg = arg_space.get_by_name('debug')
config_arg: InputArgument | None = arg_space.get_by_name('config')
debug_arg: InputArgument | None = arg_space.get_by_name('debug')
assert config_arg is not None
assert config_arg.value == "config.json"
@@ -132,27 +158,39 @@ class TestArgSpace:
assert debug_arg.founder_class is BooleanArgument
class TestArgParser:
"""Tests for the ArgParser class, which orchestrates argument parsing."""
# ============================================================================
# Fixtures for ArgParser tests
# ============================================================================
@pytest.fixture
def value_arg(self) -> ValueArgument:
"""Provides a sample ValueArgument."""
return ValueArgument(name="config", help="Path to config file", default="dev.json", is_required=False, possible_values=["dev.json", "prod.json"])
def value_arg() -> ValueArgument:
return ValueArgument(
name="config",
help="Path to config file",
default="dev.json",
is_required=False,
possible_values=["dev.json", "prod.json"],
)
@pytest.fixture
def bool_arg(self) -> BooleanArgument:
"""Provides a sample BooleanArgument."""
def bool_arg() -> BooleanArgument:
return BooleanArgument(name="debug", help="Enable debug mode")
@pytest.fixture
def processed_args(self, value_arg: ValueArgument, bool_arg: BooleanArgument) -> list:
"""Provides a list of processed arguments."""
def processed_args(value_arg: ValueArgument, bool_arg: BooleanArgument) -> list[ValueArgument | BooleanArgument]:
return [value_arg, bool_arg]
def test_initialization(self, processed_args: list):
"""Tests that the ArgParser constructor correctly assigns attributes."""
parser = ArgParser(
# ============================================================================
# Tests for ArgParser initialization
# ============================================================================
def test_argparser_initializes_with_all_properties(processed_args: list[ValueArgument | BooleanArgument]) -> None:
parser: ArgParser = ArgParser(
processed_args=processed_args,
name="TestApp",
description="A test application.",
@@ -165,80 +203,108 @@ class TestArgParser:
assert isinstance(parser.parsed_argspace, ArgSpace)
assert parser.parsed_argspace.all_arguments == []
# ============================================================================
# Tests for ArgParser argument registration (Python version specific)
# ============================================================================
@pytest.mark.skipif(sys.version_info < (3, 13), reason="requires python3.13 or higher")
def test_register_args(self, mocker, value_arg: ValueArgument, bool_arg: BooleanArgument):
"""Tests that arguments are correctly registered with the underlying ArgumentParser."""
mock_add_argument = mocker.patch("argparse.ArgumentParser.add_argument")
def test_argparser_registers_arguments_with_deprecated_flag_py313(
mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument
) -> None:
mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument")
parser = ArgParser(processed_args=[value_arg, bool_arg])
_parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg])
expected_calls = [
# Call for the ValueArgument
call(
value_arg.string_entity,
action=value_arg.action,
help=value_arg.help,
default=value_arg.default,
choices=value_arg.possible_values,
required=value_arg.is_required,
deprecated=value_arg.is_deprecated
),
# Call for the BooleanArgument
call(
bool_arg.string_entity,
action=bool_arg.action,
help=bool_arg.help,
deprecated=bool_arg.is_deprecated
)
]
mock_add_argument.assert_has_calls(expected_calls, any_order=True)
# ArgParser may add additional arguments (like help), so check at least 2
assert mock_add_argument.call_count >= 2
@pytest.mark.skipif(sys.version_info > (3, 12), reason='for more latest python version has been other test')
def test_register_args(self, mocker, value_arg: ValueArgument, bool_arg: BooleanArgument):
"""Tests that arguments are correctly registered with the underlying ArgumentParser."""
mock_add_argument = mocker.patch("argparse.ArgumentParser.add_argument")
# Check that value_arg was registered correctly
value_arg_call = None
bool_arg_call = None
parser = ArgParser(processed_args=[value_arg, bool_arg])
for call_args in mock_add_argument.call_args_list:
args, kwargs = call_args
if len(args) > 0 and args[0] == value_arg.string_entity:
value_arg_call = (args, kwargs)
elif len(args) > 0 and args[0] == bool_arg.string_entity:
bool_arg_call = (args, kwargs)
expected_calls = [
# Call for the ValueArgument
call(
value_arg.string_entity,
action=value_arg.action,
help=value_arg.help,
default=value_arg.default,
choices=value_arg.possible_values,
required=value_arg.is_required
),
# Call for the BooleanArgument
call(
bool_arg.string_entity,
action=bool_arg.action,
help=bool_arg.help
)
]
mock_add_argument.assert_has_calls(expected_calls, any_order=True)
assert value_arg_call is not None, "value_arg was not registered"
_, value_kwargs = value_arg_call
assert value_kwargs['action'] == value_arg.action
assert value_kwargs['help'] == value_arg.help
assert value_kwargs['default'] == value_arg.default
assert value_kwargs['choices'] == value_arg.possible_values
assert value_kwargs['required'] == value_arg.is_required
assert value_kwargs['deprecated'] == value_arg.is_deprecated
def test_parse_args_populates_argspace(self, mocker, processed_args: list[ValueArgument | BooleanArgument]):
"""Tests that _parse_args correctly calls the parser and populates the ArgSpace."""
# 1. Mock the return value of the internal argparse instance
mock_namespace = Namespace(config='config.json', debug=True)
mocker.patch(
'argparse.ArgumentParser.parse_args',
return_value=mock_namespace
)
assert bool_arg_call is not None, "bool_arg was not registered"
_, bool_kwargs = bool_arg_call
assert bool_kwargs['action'] == bool_arg.action
assert bool_kwargs['help'] == bool_arg.help
assert bool_kwargs['deprecated'] == bool_arg.is_deprecated
# 2. Initialize the parser and call the method under test
parser = ArgParser(processed_args=processed_args)
parser._parse_args() # Test the private method that contains the logic
# 3. Assert the results
arg_space = parser.parsed_argspace
@pytest.mark.skipif(sys.version_info > (3, 12), reason="for more latest python version has been other test")
def test_argparser_registers_arguments_without_deprecated_flag_py312(
mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument
) -> None:
mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument")
_parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg])
# ArgParser may add additional arguments (like help), so check at least 2
assert mock_add_argument.call_count >= 2
# Check that value_arg was registered correctly
value_arg_call = None
bool_arg_call = None
for call_args in mock_add_argument.call_args_list:
args, kwargs = call_args
if len(args) > 0 and args[0] == value_arg.string_entity:
value_arg_call = (args, kwargs)
elif len(args) > 0 and args[0] == bool_arg.string_entity:
bool_arg_call = (args, kwargs)
assert value_arg_call is not None, "value_arg was not registered"
_, value_kwargs = value_arg_call
assert value_kwargs['action'] == value_arg.action
assert value_kwargs['help'] == value_arg.help
assert value_kwargs['default'] == value_arg.default
assert value_kwargs['choices'] == value_arg.possible_values
assert value_kwargs['required'] == value_arg.is_required
assert 'deprecated' not in value_kwargs
assert bool_arg_call is not None, "bool_arg was not registered"
_, bool_kwargs = bool_arg_call
assert bool_kwargs['action'] == bool_arg.action
assert bool_kwargs['help'] == bool_arg.help
assert 'deprecated' not in bool_kwargs
# ============================================================================
# Tests for ArgParser argument parsing
# ============================================================================
def test_argparser_parse_args_populates_argspace_correctly(
mocker: MockerFixture, processed_args: list[ValueArgument | BooleanArgument]
) -> None:
mock_namespace: Namespace = Namespace(config='config.json', debug=True)
mocker.patch('argparse.ArgumentParser.parse_args', return_value=mock_namespace)
parser: ArgParser = ArgParser(processed_args=processed_args)
parser._parse_args()
arg_space: ArgSpace = parser.parsed_argspace
assert isinstance(arg_space, ArgSpace)
assert len(arg_space.all_arguments) == 2
config_arg = arg_space.get_by_name('config')
debug_arg = arg_space.get_by_name('debug')
config_arg: InputArgument | None = arg_space.get_by_name('config')
debug_arg: InputArgument | None = arg_space.get_by_name('debug')
assert config_arg is not None
assert config_arg.value == 'config.json'
+112 -101
View File
@@ -1,25 +1,34 @@
import os
from unittest.mock import MagicMock, call, patch
from typing import Any
import pytest
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
from argenta.app.autocompleter.entity import (
AutoCompleter,
_get_history_items,
_is_command_exist,
)
HISTORY_FILE: str = "test_history.txt"
COMMANDS: list[str] = ["start", "stop", "status"]
# ============================================================================
# Fixtures
# ============================================================================
# Since readline is not available on all platforms (e.g., Windows) for testing,
# it is mocked for all tests.
readline_mock = MagicMock()
# We patch the module where it's imported, not where it's defined.
@pytest.fixture
def mock_readline():
"""Fixture to provide a mock of the `readline` module."""
with patch('argenta.app.autocompleter.entity.readline', readline_mock) as mock:
# This nested state simulates readline's internal history list.
_history = []
def mock_readline(mocker: MockerFixture) -> Any:
_history: list[str] = []
def add_history(item: str) -> None:
_history.append(item)
def get_history_item(index: int) -> str | None:
# readline history is 1-based.
if 1 <= index <= len(_history):
return _history[index - 1]
return None
@@ -30,168 +39,170 @@ def mock_readline():
def clear_history() -> None:
_history.clear()
# Reset all mocks and the internal history before each test.
mock.reset_mock()
mock: Any = mocker.MagicMock() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mocker.patch('argenta.app.autocompleter.entity.readline', mock) # pyright: ignore[reportUnknownArgumentType]
mock.reset_mock() # pyright: ignore[reportUnknownMemberType]
clear_history()
# Apply side effects to mock functions to simulate real behavior.
mock.add_history.side_effect = add_history
mock.get_history_item.side_effect = get_history_item
mock.get_current_history_length.side_effect = get_current_history_length
mock.add_history.side_effect = add_history # pyright: ignore[reportUnknownMemberType]
mock.get_history_item.side_effect = get_history_item # pyright: ignore[reportUnknownMemberType]
mock.get_current_history_length.side_effect = get_current_history_length # pyright: ignore[reportUnknownMemberType]
mock.get_completer_delims.return_value = " " # pyright: ignore[reportUnknownMemberType]
# Provide a default return value for functions that are read from.
mock.get_completer_delims.return_value = " "
yield mock
# We import the class under test after setting up the patch context if needed,
# or ensure patches target the correct import location.
from argenta.app.autocompleter.entity import (AutoCompleter,
_get_history_items,
_is_command_exist)
return mock # pyright: ignore[reportReturnType, reportUnknownVariableType]
class TestAutoCompleter:
"""Test suite for the AutoCompleter class."""
HISTORY_FILE = "test_history.txt"
COMMANDS = ["start", "stop", "status"]
# ============================================================================
# Tests for AutoCompleter initialization
# ============================================================================
def test_initialization(self):
"""Tests that the constructor correctly assigns attributes."""
completer = AutoCompleter(history_filename=self.HISTORY_FILE, autocomplete_button="tab")
assert completer.history_filename == self.HISTORY_FILE
def test_autocompleter_initializes_with_history_file_and_button() -> None:
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE, autocomplete_button="tab")
assert completer.history_filename == HISTORY_FILE
assert completer.autocomplete_button == "tab"
def test_initial_setup_if_history_file_does_not_exist(self, fs, mock_readline):
"""Tests initial setup creates history from commands when the history file is absent."""
# Ensure the file does not exist in the fake filesystem.
if os.path.exists(self.HISTORY_FILE):
os.remove(self.HISTORY_FILE)
completer = AutoCompleter(history_filename=self.HISTORY_FILE)
completer.initial_setup(self.COMMANDS)
# ============================================================================
# Tests for initial setup
# ============================================================================
def test_initial_setup_creates_history_when_file_does_not_exist(fs: FakeFilesystem, mock_readline: Any) -> None:
if os.path.exists(HISTORY_FILE):
os.remove(HISTORY_FILE)
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
completer.initial_setup(COMMANDS)
mock_readline.read_history_file.assert_not_called()
expected_calls = [call(cmd) for cmd in self.COMMANDS]
mock_readline.add_history.assert_has_calls(expected_calls, any_order=True)
assert mock_readline.add_history.call_count == len(self.COMMANDS)
assert mock_readline.add_history.call_count == len(COMMANDS)
mock_readline.set_completer.assert_called_with(completer._complete)
mock_readline.parse_and_bind.assert_called_with("tab: complete")
def test_initial_setup_if_history_file_exists(self, fs, mock_readline):
"""Tests initial setup reads from an existing history file."""
fs.create_file(self.HISTORY_FILE, contents="previous_command\n")
completer = AutoCompleter(history_filename=self.HISTORY_FILE)
completer.initial_setup(self.COMMANDS)
def test_initial_setup_reads_existing_history_file(fs: FakeFilesystem, mock_readline: Any) -> None:
fs.create_file(HISTORY_FILE, contents="previous_command\n") # pyright: ignore[reportUnknownMemberType]
mock_readline.read_history_file.assert_called_once_with(self.HISTORY_FILE)
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
completer.initial_setup(COMMANDS)
mock_readline.read_history_file.assert_called_once_with(HISTORY_FILE)
mock_readline.add_history.assert_not_called()
mock_readline.set_completer.assert_called_once()
mock_readline.parse_and_bind.assert_called_once()
def test_initial_setup_with_no_history_filename(self, mock_readline):
"""Tests initial setup when no history filename is provided."""
completer = AutoCompleter(history_filename=None)
completer.initial_setup(self.COMMANDS)
def test_initial_setup_works_without_history_filename(mock_readline: Any) -> None:
completer: AutoCompleter = AutoCompleter(history_filename=None)
completer.initial_setup(COMMANDS)
mock_readline.read_history_file.assert_not_called()
expected_calls = [call(cmd) for cmd in self.COMMANDS]
mock_readline.add_history.assert_has_calls(expected_calls, any_order=True)
assert mock_readline.add_history.call_count == len(COMMANDS)
def test_exit_setup_writes_and_filters_history(self, fs, mock_readline):
"""Tests that exit_setup writes a filtered and unique history to the file."""
# 1. Populate the mock readline history.
mock_readline.add_history.side_effect(None) # Temporarily disable side effect to just record calls
# ============================================================================
# Tests for exit setup and history filtering
# ============================================================================
def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mock_readline: Any) -> None:
mock_readline.add_history.side_effect = None
mock_readline.add_history("start server")
mock_readline.add_history("stop client")
mock_readline.add_history("invalid command")
mock_readline.add_history("start server") # Add a duplicate.
mock_readline.add_history("start server")
# 2. Simulate the state of the history file after readline.write_history_file would have run.
raw_history_content = "\n".join(["start server", "stop client", "invalid command", "start server"])
fs.create_file(self.HISTORY_FILE, contents=raw_history_content)
raw_history_content: str = "\n".join(["start server", "stop client", "invalid command", "start server"])
fs.create_file(HISTORY_FILE, contents=raw_history_content) # pyright: ignore[reportUnknownMemberType]
# 3. Call the method under test.
completer = AutoCompleter(history_filename=self.HISTORY_FILE)
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
completer.exit_setup(all_commands=["start", "stop"], ignore_command_register=False)
# 4. Assert that readline's write function was called.
mock_readline.write_history_file.assert_called_once_with(self.HISTORY_FILE)
mock_readline.write_history_file.assert_called_once_with(HISTORY_FILE)
# 5. Assert the file was correctly re-written with filtered and unique content.
with open(self.HISTORY_FILE, "r") as f:
content = f.read()
lines = sorted(content.strip().split("\n"))
with open(HISTORY_FILE) as f:
content: str = f.read()
lines: list[str] = sorted(content.strip().split("\n"))
assert lines == ["start server", "stop client"]
def test_exit_setup_with_no_history_filename(self, mock_readline):
"""Tests that exit_setup does nothing if no filename is provided."""
completer = AutoCompleter(history_filename=None)
completer.exit_setup(all_commands=self.COMMANDS, ignore_command_register=False)
def test_exit_setup_skips_writing_when_no_history_filename(mock_readline: Any) -> None:
completer: AutoCompleter = AutoCompleter(history_filename=None)
completer.exit_setup(all_commands=COMMANDS, ignore_command_register=False)
mock_readline.write_history_file.assert_not_called()
def test_complete_with_no_matches(self, mock_readline):
"""Tests the _complete method when there are no matching history items."""
# ============================================================================
# Tests for autocomplete functionality
# ============================================================================
def test_complete_returns_none_when_no_matches_found(mock_readline: Any) -> None:
cmd: str
for cmd in ["start", "stop"]:
mock_readline.add_history(cmd)
completer = AutoCompleter()
completer: AutoCompleter = AutoCompleter()
assert completer._complete("run", 0) is None
assert completer._complete("run", 1) is None
def test_complete_with_one_match(self, mock_readline):
"""Tests the _complete method when there is exactly one match."""
def test_complete_returns_single_match(mock_readline: Any) -> None:
mock_readline.add_history("start server")
mock_readline.add_history("stop server")
completer = AutoCompleter()
completer: AutoCompleter = AutoCompleter()
assert completer._complete("start", 0) == "start server"
assert completer._complete("start", 1) is None # Subsequent states yield no matches
assert completer._complete("start", 1) is None
def test_complete_with_multiple_matches(self, mock_readline):
"""Tests _complete with multiple matches that share a common prefix."""
def test_complete_inserts_common_prefix_for_multiple_matches(mock_readline: Any) -> None:
mock_readline.add_history("status client")
mock_readline.add_history("status server")
mock_readline.add_history("stop")
completer = AutoCompleter()
completer: AutoCompleter = AutoCompleter()
# On state 0, it should insert the common prefix via readline and return None.
result = completer._complete("stat", 0)
result: str | None = completer._complete("stat", 0)
assert result is None
mock_readline.insert_text.assert_called_once_with("us ") # Completes "stat" to "status "
mock_readline.insert_text.assert_called_once_with("us ")
mock_readline.redisplay.assert_called_once()
# On subsequent states, it should do nothing.
mock_readline.reset_mock()
result_state_1 = completer._complete("stat", 1)
result_state_1: str | None = completer._complete("stat", 1)
assert result_state_1 is None
mock_readline.insert_text.assert_not_called()
class TestHelperFunctions:
"""Test suite for helper functions in the autocompleter module."""
# ============================================================================
# Tests for helper functions
# ============================================================================
def test_is_command_exist(self):
"""Tests the _is_command_exist helper function."""
existing = ["start", "stop", "status"]
# Case-sensitive check
def test_is_command_exist_checks_case_sensitive_when_enabled() -> None:
existing: list[str] = ["start", "stop", "status"]
assert _is_command_exist("start", existing, ignore_command_register=False) is True
assert _is_command_exist("START", existing, ignore_command_register=False) is False
assert _is_command_exist("unknown", existing, ignore_command_register=False) is False
# Case-insensitive check
def test_is_command_exist_checks_case_insensitive_when_enabled() -> None:
existing: list[str] = ["start", "stop", "status"]
assert _is_command_exist("start", existing, ignore_command_register=True) is True
assert _is_command_exist("START", existing, ignore_command_register=True) is True
assert _is_command_exist("unknown", existing, ignore_command_register=True) is False
def test_get_history_items(self, mock_readline):
"""Tests the _get_history_items helper function."""
def test_get_history_items_returns_empty_list_initially(mock_readline: Any) -> None:
assert _get_history_items() == []
def test_get_history_items_returns_all_added_items(mock_readline: Any) -> None:
mock_readline.add_history("first item")
mock_readline.add_history("second item")
+87 -36
View File
@@ -1,63 +1,114 @@
import re
import unittest
from argenta.command.exceptions import (EmptyInputCommandException,
import pytest
from argenta.command.exceptions import (
EmptyInputCommandException,
RepeatedInputFlagsException,
UnprocessedInputFlagException)
UnprocessedInputFlagException,
)
from argenta.command.flag import Flag, InputFlag
from argenta.command.flag.flags import Flags
from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.command.models import Command, InputCommand
class TestInputCommand(unittest.TestCase):
def test_parse_correct_raw_command(self):
self.assertEqual(InputCommand.parse('ssh --host 192.168.0.3').trigger, 'ssh')
# ============================================================================
# Tests for InputCommand parsing - successful cases
# ============================================================================
def test_parse_raw_command_without_flag_name_with_value(self):
with self.assertRaises(UnprocessedInputFlagException):
def test_parse_extracts_trigger_from_command_with_flags() -> None:
assert InputCommand.parse('ssh --host 192.168.0.3').trigger == 'ssh'
def test_parse_returns_input_command_instance() -> None:
cmd = InputCommand.parse('ssh --host 192.168.0.3')
assert isinstance(cmd, InputCommand)
def test_parse_handles_triple_prefix_flags() -> None:
assert InputCommand.parse(
'ssh ---host 192.168.0.0'
).input_flags.get_flag_by_name('host') == \
InputFlag('host', input_value='192.168.0.0', prefix='---')
# ============================================================================
# Tests for InputCommand parsing - error cases
# ============================================================================
def test_parse_raises_error_for_value_without_flag_name() -> None:
with pytest.raises(UnprocessedInputFlagException):
InputCommand.parse('ssh 192.168.0.3')
def test_parse_raw_command_with_repeated_flag_name(self):
with self.assertRaises(RepeatedInputFlagsException):
def test_parse_raises_error_for_repeated_flag_names() -> None:
with pytest.raises(RepeatedInputFlagsException):
InputCommand.parse('ssh --host 192.168.0.3 --host 172.198.0.43')
def test_parse_empty_raw_command(self):
with self.assertRaises(EmptyInputCommandException):
def test_parse_raises_error_for_unprocessed_entity_after_flags() -> None:
with pytest.raises(UnprocessedInputFlagException):
InputCommand.parse('ssh --host 192.168.0.3 9977')
def test_parse_raises_error_for_empty_command() -> None:
with pytest.raises(EmptyInputCommandException):
InputCommand.parse('')
def test_validate_invalid_input_flag1(self):
command = Command('some', flags=Flag('test'))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='', status=None)), ValidationStatus.INVALID)
def test_validate_valid_input_flag2(self):
# ============================================================================
# Tests for flag validation - valid flags
# ============================================================================
def test_validate_input_flag_returns_valid_for_registered_flag() -> None:
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
self.assertEqual(command.validate_input_flag(InputFlag('more', input_value='random-value', status=None)), ValidationStatus.VALID)
assert command.validate_input_flag(InputFlag('more', input_value='random-value', status=None)) == ValidationStatus.VALID
def test_validate_undefined_input_flag1(self):
# ============================================================================
# Tests for flag validation - invalid flags
# ============================================================================
def test_validate_input_flag_returns_invalid_for_flag_with_empty_value() -> None:
command = Command('some', flags=Flag('test'))
self.assertEqual(command.validate_input_flag(InputFlag('more', input_value='', status=None)), ValidationStatus.UNDEFINED)
assert command.validate_input_flag(InputFlag('test', input_value='', status=None)) == ValidationStatus.INVALID
def test_validate_undefined_input_flag2(self):
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
self.assertEqual(command.validate_input_flag(InputFlag('case', input_value='', status=None)), ValidationStatus.UNDEFINED)
def test_validate_undefined_input_flag3(self):
command = Command('some')
self.assertEqual(command.validate_input_flag(InputFlag('case', input_value='', status=None)), ValidationStatus.UNDEFINED)
def test_invalid_input_flag1(self):
def test_validate_input_flag_returns_invalid_when_value_provided_for_neither_flag() -> None:
command = Command('some', flags=Flag('test', possible_values=PossibleValues.NEITHER))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID)
assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID
def test_invalid_input_flag2(self):
def test_validate_input_flag_returns_invalid_when_value_not_in_allowed_list() -> None:
command = Command('some', flags=Flag('test', possible_values=['some', 'case']))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='slay', status=None)), ValidationStatus.INVALID)
assert command.validate_input_flag(InputFlag('test', input_value='slay', status=None)) == ValidationStatus.INVALID
def test_invalid_input_flag3(self):
command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{, 2}op$')))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID)
def test_isinstance_parse_correct_raw_command(self):
cmd = InputCommand.parse('ssh --host 192.168.0.3')
self.assertIsInstance(cmd, InputCommand)
def test_validate_input_flag_returns_invalid_when_value_does_not_match_regex() -> None:
command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{1,2}op$')))
assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID
# ============================================================================
# Tests for flag validation - undefined flags
# ============================================================================
def test_validate_input_flag_returns_undefined_for_unregistered_flag_name() -> None:
command = Command('some', flags=Flag('test'))
assert command.validate_input_flag(InputFlag('more', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_validate_input_flag_returns_undefined_for_unregistered_flag_in_multiple_flags() -> None:
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_validate_input_flag_returns_undefined_when_command_has_no_flags() -> None:
command = Command('some')
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
+120
View File
@@ -0,0 +1,120 @@
from typing import Generator
import pytest
from dishka import Container, make_container
from argenta import App, DataBridge, Router
from argenta.di.integration import (
FromDishka,
_auto_inject_handlers,
_get_container_from_response,
setup_dishka,
)
from argenta.di.providers import SystemProvider
from argenta.orchestrator.argparser import ArgParser, ArgSpace
from argenta.response import ResponseStatus
from argenta.response.entity import Response
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def argparser() -> ArgParser:
return ArgParser(processed_args=[])
@pytest.fixture
def container(argparser: ArgParser) -> Generator[Container, None, None]:
container = make_container(SystemProvider(), context={ArgParser: argparser})
yield container
container.close()
# ============================================================================
# Tests for container retrieval from response
# ============================================================================
def test_get_container_from_response_extracts_container_from_first_response_arg(container: Container) -> None:
Response.patch_by_container(container)
response = Response(ResponseStatus.ALL_FLAGS_VALID)
assert _get_container_from_response((response,), {}) == container
def test_get_container_from_response_extracts_container_from_second_response_arg(container: Container) -> None:
Response.patch_by_container(container)
response = Response(ResponseStatus.ALL_FLAGS_VALID)
assert _get_container_from_response((object(), response,), {}) == container
def test_get_container_from_response_raises_error_when_container_not_patched() -> None:
delattr(Response, '__dishka_container__')
response = Response(ResponseStatus.ALL_FLAGS_VALID)
with pytest.raises(RuntimeError):
_get_container_from_response((response,), {})
def test_get_container_from_response_raises_error_when_no_response_in_args(container: Container) -> None:
Response.patch_by_container(container)
with pytest.raises(RuntimeError):
_get_container_from_response((), {})
# ============================================================================
# Tests for dishka setup
# ============================================================================
def test_setup_dishka_with_auto_inject_enabled(container: Container) -> None:
app = App()
router = Router()
@router.command('command')
def handler(_res: Response, data_bridge: FromDishka[DataBridge]) -> None:
print(data_bridge)
app.include_router(router)
assert setup_dishka(app, container, auto_inject=True) is None
def test_setup_dishka_with_auto_inject_disabled(container: Container) -> None:
app = App()
assert setup_dishka(app, container, auto_inject=False) is None
# ============================================================================
# Tests for auto injection
# ============================================================================
def test_auto_inject_handlers_injects_dependencies_into_handlers(container: Container) -> None:
Response.patch_by_container(container)
app = App()
router = Router()
@router.command('command')
def handler(_res: Response, data_bridge: FromDishka[DataBridge]) -> None:
print(data_bridge)
app.include_router(router)
_auto_inject_handlers(app)
_auto_inject_handlers(app) # check idempotency
# ============================================================================
# Tests for container dependency resolution
# ============================================================================F
def test_container_resolves_argspace_dependency(container: Container) -> None:
assert isinstance(container.get(ArgSpace), ArgSpace)
def test_container_resolves_databridge_dependency(container: Container) -> None:
assert isinstance(container.get(DataBridge), DataBridge)
+52 -12
View File
@@ -1,21 +1,61 @@
import unittest
from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine
class TestDividingLine(unittest.TestCase):
def test_get_static_dividing_line_full_line(self):
# ============================================================================
# Tests for StaticDividingLine - full line generation
# ============================================================================
def test_static_dividing_line_generates_default_length_with_override() -> None:
line = StaticDividingLine('-')
self.assertEqual(line.get_full_static_line(is_override=True).count('-'), 25)
assert line.get_full_static_line(is_override=True).count('-') == 25
def test_get_dynamic_dividing_line_full_line(self):
line = DynamicDividingLine()
self.assertEqual(line.get_full_dynamic_line(length=20, is_override=True).count('-'), 20)
def test_get_dividing_line_unit_part(self):
def test_static_dividing_line_generates_custom_length_with_formatting() -> None:
line = StaticDividingLine('-', length=5)
assert line.get_full_static_line(is_override=False) == '\n[dim]-----[/dim]\n'
# ============================================================================
# Tests for StaticDividingLine - unit part extraction
# ============================================================================
def test_static_dividing_line_returns_space_for_empty_unit() -> None:
line = StaticDividingLine('')
self.assertEqual(line.get_unit_part(), ' ')
assert line.get_unit_part() == ' '
def test_get_dividing_line2_unit_part(self):
def test_static_dividing_line_returns_first_character_as_unit() -> None:
line = StaticDividingLine('+-0987654321!@#$%^&*()_')
self.assertEqual(line.get_unit_part(), '+')
assert line.get_unit_part() == '+'
# ============================================================================
# Tests for DynamicDividingLine - full line generation
# ============================================================================
def test_dynamic_dividing_line_generates_line_with_specified_length_and_override() -> None:
line = DynamicDividingLine()
assert line.get_full_dynamic_line(length=20, is_override=True).count('-') == 20
def test_dynamic_dividing_line_generates_line_with_specified_length_and_formatting() -> None:
line = DynamicDividingLine()
assert line.get_full_dynamic_line(length=5, is_override=False) == '\n[dim]-----[/dim]\n'
# ============================================================================
# Tests for DynamicDividingLine - unit part extraction
# ============================================================================
def test_dynamic_dividing_line_returns_space_for_empty_unit() -> None:
line = DynamicDividingLine('')
assert line.get_unit_part() == ' '
def test_dynamic_dividing_line_returns_first_character_as_unit() -> None:
line = DynamicDividingLine('45n352834528&^%@&*T$G')
assert line.get_unit_part() == '4'
+246 -61
View File
@@ -1,91 +1,226 @@
import re
import unittest
import pytest
from argenta.command.flag import Flag, InputFlag, PossibleValues
from argenta.command.flag.flags import Flags, InputFlags
class TestFlag(unittest.TestCase):
def test_get_string_entity(self):
self.assertEqual(Flag(name='test').string_entity,
'--test')
# ============================================================================
# Tests for Flag - basic properties
# ============================================================================
def test_get_string_entity2(self):
self.assertEqual(Flag(name='test',
prefix='---').string_entity,
'---test')
def test_get_flag_name(self):
self.assertEqual(Flag(name='test').name,
'test')
def test_flag_string_entity_with_default_prefix() -> None:
assert Flag(name='test').string_entity == '--test'
def test_get_flag_prefix(self):
self.assertEqual(Flag(name='test').prefix,
'--')
def test_get_flag_prefix2(self):
self.assertEqual(Flag(name='test',
prefix='--').prefix,
'--')
def test_flag_string_entity_with_custom_prefix() -> None:
assert Flag(name='test', prefix='---').string_entity == '---test'
def test_get_flag_value_without_set(self):
self.assertEqual(InputFlag(name='test', input_value='', status=None).input_value,
'')
def test_get_flag_value_with_set(self):
flag = InputFlag(name='test', input_value='example', status=None)
self.assertEqual(flag.input_value, 'example')
def test_flag_name_property() -> None:
assert Flag(name='test').name == 'test'
def test_validate_incorrect_flag_value_with_list_of_possible_flag_values(self):
def test_flag_prefix_property_default() -> None:
assert Flag(name='test').prefix == '--'
def test_flag_prefix_property_custom() -> None:
assert Flag(name='test', prefix='--').prefix == '--'
# ============================================================================
# Tests for Flag - string representations
# ============================================================================
def test_flag_str_representation() -> None:
flag = Flag('two')
assert str(flag) == '--two'
def test_flag_repr_representation() -> None:
flag = Flag('two')
assert repr(flag) == 'Flag<name=two, prefix=-->'
def test_flag_equality_with_non_flag_raises_error() -> None:
flag = Flag('two')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for Flag - value validation with list of possible values
# ============================================================================
def test_flag_validates_value_in_allowed_list() -> None:
flag = Flag(name='test', possible_values=['1', '2', '3'])
self.assertEqual(flag.validate_input_flag_value('bad value'), False)
assert flag.validate_input_flag_value('1') is True
def test_validate_correct_flag_value_with_list_of_possible_flag_values(self):
def test_flag_rejects_value_not_in_allowed_list() -> None:
flag = Flag(name='test', possible_values=['1', '2', '3'])
self.assertEqual(flag.validate_input_flag_value('1'), True)
assert flag.validate_input_flag_value('bad value') is False
def test_validate_incorrect_flag_value_with_pattern_of_possible_flag_values(self):
# ============================================================================
# Tests for Flag - value validation with regex pattern
# ============================================================================
def test_flag_validates_value_matching_regex_pattern() -> None:
flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+'))
self.assertEqual(flag.validate_input_flag_value('152.123.9.8'), False)
assert flag.validate_input_flag_value('192.168.9.8') is True
def test_validate_correct_flag_value_with_pattern_of_possible_flag_values(self):
def test_flag_rejects_value_not_matching_regex_pattern() -> None:
flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+'))
self.assertEqual(flag.validate_input_flag_value('192.168.9.8'), True)
assert flag.validate_input_flag_value('152.123.9.8') is False
def test_validate_correct_empty_flag_value_without_possible_flag_values(self):
# ============================================================================
# Tests for Flag - value validation with NEITHER and ALL
# ============================================================================
def test_flag_validates_empty_value_when_neither_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value(''), True)
assert flag.validate_input_flag_value('') is True
def test_validate_correct_empty_flag_value_with_possible_flag_values(self):
def test_flag_rejects_non_empty_value_when_neither_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value(''), True)
assert flag.validate_input_flag_value('random value') is False
def test_validate_incorrect_random_flag_value_without_possible_flag_values(self):
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value('random value'), False)
def test_validate_correct_random_flag_value_with_possible_flag_values(self):
def test_flag_validates_any_value_when_all_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.ALL)
self.assertEqual(flag.validate_input_flag_value('random value'), True)
assert flag.validate_input_flag_value('random value') is True
def test_get_input_flag1(self):
# ============================================================================
# Tests for InputFlag - basic properties
# ============================================================================
def test_input_flag_stores_empty_value() -> None:
assert InputFlag(name='test', input_value='', status=None).input_value == ''
def test_input_flag_stores_provided_value() -> None:
flag = InputFlag(name='test', input_value='example', status=None)
assert flag.input_value == 'example'
# ============================================================================
# Tests for InputFlag - string representations
# ============================================================================
def test_input_flag_str_representation() -> None:
flag = InputFlag('two', input_value='value')
assert str(flag) == '--two value'
def test_input_flag_repr_representation() -> None:
flag = InputFlag('two', input_value='some_value')
assert repr(flag) == 'InputFlag<name=two, prefix=--, value=some_value, status=None>'
def test_input_flag_equality_with_non_flag_raises_error() -> None:
flag = InputFlag('two', input_value='')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for InputFlags collection - retrieval
# ============================================================================
def test_input_flags_get_by_name_finds_single_flag() -> None:
flag = InputFlag(name='test', input_value='', status=None)
input_flags = InputFlags([flag])
self.assertEqual(input_flags.get_flag_by_name('test'), flag)
assert input_flags.get_flag_by_name('test') == flag
def test_get_input_flag2(self):
def test_input_flags_get_by_name_finds_flag_in_multiple() -> None:
flag = InputFlag(name='test', input_value='', status=None)
flag2 = InputFlag(name='some', input_value='', status=None)
input_flags = InputFlags([flag, flag2])
self.assertEqual(input_flags.get_flag_by_name('some'), flag2)
assert input_flags.get_flag_by_name('some') == flag2
def test_get_undefined_input_flag(self):
def test_input_flags_get_by_name_returns_none_for_missing_flag() -> None:
flag = InputFlag(name='test', input_value='', status=None)
flag2 = InputFlag(name='some', input_value='', status=None)
input_flags = InputFlags([flag, flag2])
self.assertEqual(input_flags.get_flag_by_name('case'), None)
assert input_flags.get_flag_by_name('case') is None
def test_get_flags(self):
# ============================================================================
# Tests for InputFlags collection - equality and containment
# ============================================================================
def test_input_flags_not_equal_when_different_length() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
flags2 = InputFlags([
InputFlag('some', input_value=''),
InputFlag('some2', input_value='')
])
assert flags != flags2
def test_input_flags_not_equal_to_non_input_flags() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
not_flags = object()
assert flags != not_flags
def test_input_flags_contains_existing_flag() -> None:
flag = InputFlag('some', input_value='')
flags = InputFlags([flag])
assert flag in flags
def test_input_flags_does_not_contain_missing_flag() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
flag = InputFlag('nonexists', input_value='')
assert flag not in flags
def test_input_flags_contains_raises_error_for_non_flag() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
not_flag = object
with pytest.raises(TypeError):
not_flag in flags # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for Flags collection - adding flags
# ============================================================================
def test_flags_add_single_flag() -> None:
flags = Flags()
flags.add_flag(Flag('test'))
assert len(flags.flags) == 1
def test_flags_add_multiple_flags() -> None:
flags = Flags()
flags.add_flags([Flag('test'), Flag('test2')])
assert len(flags.flags) == 2
def test_flags_stores_added_flags() -> None:
flags = Flags()
list_of_flags = [
Flag('test1'),
@@ -93,37 +228,87 @@ class TestFlag(unittest.TestCase):
Flag('test3'),
]
flags.add_flags(list_of_flags)
self.assertEqual(flags.flags,
list_of_flags)
def test_add_flag(self):
flags = Flags()
flags.add_flag(Flag('test'))
self.assertEqual(len(flags.flags), 1)
def test_add_flags(self):
flags = Flags()
flags.add_flags([Flag('test'), Flag('test2')])
self.assertEqual(len(flags.flags), 2)
assert flags.flags == list_of_flags
# ============================================================================
# Tests for Flags collection - retrieval
# ============================================================================
def test_flags_get_by_name_finds_flag() -> None:
flags = Flags([Flag('some')])
assert flags.get_flag_by_name('some') == Flag('some')
# ============================================================================
# Tests for Flags collection - equality and containment
# ============================================================================
def test_flags_equal_when_same_flags() -> None:
flags = Flags([Flag('some')])
flags2 = Flags([Flag('some')])
assert flags == flags2
def test_flags_not_equal_when_different_flags() -> None:
flags = Flags([Flag('some')])
flags2 = Flags([Flag('other')])
assert flags != flags2
def test_flags_not_equal_when_different_length() -> None:
flags = Flags([Flag('some')])
flags2 = Flags([Flag('some'), Flag('other')])
assert flags != flags2
def test_flags_not_equal_to_non_flags() -> None:
flags = Flags([Flag('some')])
not_flags = object()
assert flags != not_flags
def test_flags_contains_existing_flag() -> None:
flags = Flags([Flag('some')])
flag = Flag('some')
assert flag in flags
def test_flags_does_not_contain_missing_flag() -> None:
flags = Flags([Flag('some')])
flag = Flag('nonexists')
assert flag not in flags
def test_flags_contains_raises_error_for_non_flag() -> None:
flags = Flags([Flag('some')])
not_flag = object
with pytest.raises(TypeError):
not_flag in flags # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for Flags collection - special methods
# ============================================================================
def test_flags_len_returns_count() -> None:
flags = Flags([Flag('one'), Flag('two')])
assert len(flags) == 2
def test_flags_bool_returns_true_when_not_empty() -> None:
flags = Flags([Flag('one'), Flag('two')])
assert bool(flags)
def test_flags_bool_returns_false_when_empty() -> None:
flags = Flags([])
assert not bool(flags)
def test_flags_getitem_returns_flag_at_index() -> None:
flags = Flags([Flag('one'), Flag('two')])
assert flags[1] == Flag('two')
+259
View File
@@ -0,0 +1,259 @@
import pytest
from dishka import Provider
from pytest_mock import MockerFixture
from argenta import App, Router
from argenta.command import Command
from argenta.orchestrator import Orchestrator
from argenta.orchestrator.argparser import ArgParser
from argenta.response import Response
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def mock_argparser(mocker: MockerFixture) -> ArgParser:
"""Create a mock ArgParser that doesn't actually parse sys.argv"""
argparser = ArgParser(processed_args=[])
mocker.patch.object(argparser, '_parse_args')
return argparser
@pytest.fixture
def sample_app() -> App:
"""Create a sample App for testing"""
return App(override_system_messages=True)
@pytest.fixture
def sample_router() -> Router:
"""Create a sample Router with a test command"""
router = Router()
@router.command(Command('test'))
def handler(_res: Response) -> None:
print('test command executed')
return router
# ============================================================================
# Tests for Orchestrator initialization
# ============================================================================
def test_orchestrator_initializes_with_default_argparser(mocker: MockerFixture) -> None:
"""Test Orchestrator initialization with default ArgParser"""
mocker.patch('sys.argv', ['test_program'])
orchestrator = Orchestrator()
assert orchestrator._arg_parser is not None
assert isinstance(orchestrator._arg_parser, ArgParser)
def test_orchestrator_initializes_with_custom_argparser(mock_argparser: ArgParser) -> None:
"""Test Orchestrator initialization with custom ArgParser"""
orchestrator = Orchestrator(arg_parser=mock_argparser)
assert orchestrator._arg_parser is mock_argparser
def test_orchestrator_initializes_with_custom_providers(mocker: MockerFixture) -> None:
"""Test Orchestrator initialization with custom providers"""
mocker.patch('sys.argv', ['test_program'])
custom_provider = Provider()
orchestrator = Orchestrator(custom_providers=[custom_provider])
assert custom_provider in orchestrator._custom_providers
def test_orchestrator_initializes_with_auto_inject_disabled(mocker: MockerFixture) -> None:
"""Test Orchestrator initialization with auto_inject_handlers disabled"""
mocker.patch('sys.argv', ['test_program'])
orchestrator = Orchestrator(auto_inject_handlers=False)
assert orchestrator._auto_inject_handlers is False
def test_orchestrator_initializes_with_auto_inject_enabled(mocker: MockerFixture) -> None:
"""Test Orchestrator initialization with auto_inject_handlers enabled (default)"""
mocker.patch('sys.argv', ['test_program'])
orchestrator = Orchestrator()
assert orchestrator._auto_inject_handlers is True
def test_orchestrator_parses_args_on_initialization(mocker: MockerFixture, mock_argparser: ArgParser) -> None:
"""Test that Orchestrator calls _parse_args on initialization"""
parse_spy = mocker.spy(mock_argparser, '_parse_args')
_orchestrator = Orchestrator(arg_parser=mock_argparser)
parse_spy.assert_called_once()
# ============================================================================
# Tests for start_polling method
# ============================================================================
def test_start_polling_creates_dishka_container(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that start_polling creates a dishka container"""
mock_make_container = mocker.patch('argenta.orchestrator.entity.make_container')
_mock_setup_dishka = mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser)
orchestrator.start_polling(sample_app)
mock_make_container.assert_called_once()
assert mock_make_container.call_args[1]['context'] == {ArgParser: mock_argparser}
def test_start_polling_calls_setup_dishka_with_auto_inject_enabled(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that start_polling calls setup_dishka with auto_inject=True"""
mock_container = mocker.MagicMock() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mocker.patch('argenta.orchestrator.entity.make_container', return_value=mock_container)
mock_setup_dishka = mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser, auto_inject_handlers=True)
orchestrator.start_polling(sample_app)
mock_setup_dishka.assert_called_once_with(sample_app, mock_container, auto_inject=True)
def test_start_polling_calls_setup_dishka_with_auto_inject_disabled(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that start_polling calls setup_dishka with auto_inject=False"""
mock_container = mocker.MagicMock() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mocker.patch('argenta.orchestrator.entity.make_container', return_value=mock_container)
mock_setup_dishka = mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser, auto_inject_handlers=False)
orchestrator.start_polling(sample_app)
mock_setup_dishka.assert_called_once_with(sample_app, mock_container, auto_inject=False)
def test_start_polling_calls_app_run_polling(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that start_polling calls app.run_polling()"""
mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mock_run_polling = mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser)
orchestrator.start_polling(sample_app)
mock_run_polling.assert_called_once()
def test_start_polling_includes_custom_providers_in_container(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that start_polling includes custom providers in container"""
custom_provider = Provider()
mock_make_container = mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser, custom_providers=[custom_provider])
orchestrator.start_polling(sample_app)
# Check that custom_provider was passed to make_container
call_args = mock_make_container.call_args[0]
assert custom_provider in call_args
# ============================================================================
# Tests for integration with App
# ============================================================================
def test_orchestrator_integrates_with_app_with_router(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App, sample_router: Router
) -> None:
"""Test that Orchestrator properly integrates with App that has routers"""
mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mock_run_polling = mocker.patch.object(sample_app, 'run_polling')
sample_app.include_router(sample_router)
orchestrator = Orchestrator(arg_parser=mock_argparser)
orchestrator.start_polling(sample_app)
mock_run_polling.assert_called_once()
assert len(sample_app.registered_routers.registered_routers) == 1
# ============================================================================
# Tests for ArgParser integration
# ============================================================================
def test_orchestrator_passes_argparser_to_container_context(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that Orchestrator passes ArgParser instance to container context"""
mock_make_container = mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(arg_parser=mock_argparser)
orchestrator.start_polling(sample_app)
# Verify that ArgParser was passed in context
call_kwargs = mock_make_container.call_args[1]
assert 'context' in call_kwargs
assert ArgParser in call_kwargs['context']
assert call_kwargs['context'][ArgParser] is mock_argparser
# ============================================================================
# Tests for error handling
# ============================================================================
def test_orchestrator_handles_app_run_polling_exception(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that Orchestrator propagates exceptions from app.run_polling()"""
mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling', side_effect=RuntimeError("Test error"))
orchestrator = Orchestrator(arg_parser=mock_argparser)
with pytest.raises(RuntimeError, match="Test error"):
orchestrator.start_polling(sample_app)
# ============================================================================
# Tests for multiple providers
# ============================================================================
def test_orchestrator_accepts_multiple_custom_providers(
mocker: MockerFixture, mock_argparser: ArgParser, sample_app: App
) -> None:
"""Test that Orchestrator accepts multiple custom providers"""
provider1 = Provider()
provider2 = Provider()
mock_make_container = mocker.patch('argenta.orchestrator.entity.make_container')
mocker.patch('argenta.orchestrator.entity.setup_dishka')
mocker.patch.object(sample_app, 'run_polling')
orchestrator = Orchestrator(
arg_parser=mock_argparser,
custom_providers=[provider1, provider2]
)
orchestrator.start_polling(sample_app)
call_args = mock_make_container.call_args[0]
assert provider1 in call_args
assert provider2 in call_args
+101 -60
View File
@@ -1,91 +1,137 @@
import unittest
from datetime import date, datetime
from argenta.data_bridge import DataBridge
from argenta.command.flag.models import InputFlag
import pytest
from argenta.command.flag.flags.models import InputFlags
from argenta.command.flag.models import InputFlag
from argenta.data_bridge import DataBridge
from argenta.response.entity import EMPTY_INPUT_FLAGS, Response
from argenta.response.status import ResponseStatus
class TestDataBridge(unittest.TestCase):
def setUp(self):
"""Create a new DataBridge instance for each test"""
self.data_bridge = DataBridge()
# ============================================================================
# Fixtures
# ============================================================================
def test_update_data_basic(self):
@pytest.fixture
def data_bridge() -> DataBridge:
"""Create a new DataBridge instance for each test"""
return DataBridge()
# ============================================================================
# Tests for DataBridge - basic data operations
# ============================================================================
def test_databridge_update_stores_basic_data(data_bridge: DataBridge) -> None:
"""Test basic data update functionality"""
test_data = {"key1": "value1", "key2": "value2"}
self.data_bridge.update(test_data)
self.assertEqual(self.data_bridge.get_all(), test_data)
data_bridge.update(test_data)
assert data_bridge.get_all() == test_data
def test_update_data_with_datetime(self):
def test_databridge_update_stores_datetime_objects(data_bridge: DataBridge) -> None:
"""Test updating data with datetime objects"""
test_datetime = datetime(2024, 1, 15, 10, 30, 45)
test_data = {"created_at": test_datetime, "name": "test"}
self.data_bridge.update(test_data)
data_bridge.update(test_data)
result = self.data_bridge.get_all()
self.assertEqual(result["created_at"], test_datetime)
self.assertEqual(result["name"], "test")
result = data_bridge.get_all()
assert result["created_at"] == test_datetime
assert result["name"] == "test"
def test_update_data_multiple_calls(self):
def test_databridge_multiple_updates_merge_data(data_bridge: DataBridge) -> None:
"""Test multiple update calls merge data"""
first_data = {"key1": "value1"}
second_data = {"key2": "value2"}
self.data_bridge.update(first_data)
self.data_bridge.update(second_data)
self.assertEqual(len(self.data_bridge.get_all()), 2)
data_bridge.update(first_data)
data_bridge.update(second_data)
assert len(data_bridge.get_all()) == 2
def test_get_data_empty(self):
# ============================================================================
# Tests for DataBridge - data retrieval
# ============================================================================
def test_databridge_get_all_returns_empty_dict_initially(data_bridge: DataBridge) -> None:
"""Test get_all returns empty dict when no data"""
self.assertEqual(self.data_bridge.get_all(), {})
assert data_bridge.get_all() == {}
def test_clear_data(self):
"""Test clear_all removes all data"""
self.data_bridge.update({"key": "value"})
self.assertNotEqual(self.data_bridge.get_all(), {})
self.data_bridge.clear_all()
self.assertEqual(self.data_bridge.get_all(), {})
def test_delete_from_data(self):
"""Test delete_by_key removes specific key"""
test_data = {"key1": "value1", "key2": "value2"}
self.data_bridge.update(test_data)
self.data_bridge.delete_by_key("key1")
result = self.data_bridge.get_all()
self.assertNotIn("key1", result)
self.assertIn("key2", result)
def test_delete_from_data_nonexistent_key(self):
"""Test delete_by_key with nonexistent key raises KeyError"""
with self.assertRaises(KeyError):
self.data_bridge.delete_by_key("nonexistent_key")
def test_get_by_key(self):
def test_databridge_get_by_key_retrieves_correct_values(data_bridge: DataBridge) -> None:
"""Test get_by_key retrieves correct value"""
test_data = {"key1": "value1", "key2": date(2024, 1, 1)}
self.data_bridge.update(test_data)
self.assertEqual(self.data_bridge.get_by_key("key1"), "value1")
self.assertEqual(self.data_bridge.get_by_key("key2"), date(2024, 1, 1))
self.assertIsNone(self.data_bridge.get_by_key("nonexistent"))
data_bridge.update(test_data)
assert data_bridge.get_by_key("key1") == "value1"
assert data_bridge.get_by_key("key2") == date(2024, 1, 1)
class TestResponse(unittest.TestCase):
def test_response_initialization_basic(self):
def test_databridge_get_by_key_returns_none_for_missing_key(data_bridge: DataBridge) -> None:
"""Test get_by_key returns None for nonexistent key"""
test_data = {"key1": "value1"}
data_bridge.update(test_data)
assert data_bridge.get_by_key("nonexistent") is None
# ============================================================================
# Tests for DataBridge - data deletion
# ============================================================================
def test_databridge_clear_all_removes_all_data(data_bridge: DataBridge) -> None:
"""Test clear_all removes all data"""
data_bridge.update({"key": "value"})
assert data_bridge.get_all() != {}
data_bridge.clear_all()
assert data_bridge.get_all() == {}
def test_databridge_delete_by_key_removes_specific_key(data_bridge: DataBridge) -> None:
"""Test delete_by_key removes specific key"""
test_data = {"key1": "value1", "key2": "value2"}
data_bridge.update(test_data)
data_bridge.delete_by_key("key1")
result = data_bridge.get_all()
assert "key1" not in result
assert "key2" in result
def test_databridge_delete_by_key_raises_error_for_missing_key(data_bridge: DataBridge) -> None:
"""Test delete_by_key with nonexistent key raises KeyError"""
with pytest.raises(KeyError):
data_bridge.delete_by_key("nonexistent_key")
# ============================================================================
# Tests for Response - initialization
# ============================================================================
def test_response_initializes_with_status_and_empty_flags() -> None:
"""Test basic Response initialization"""
response = Response(ResponseStatus.ALL_FLAGS_VALID)
self.assertEqual(response.status, ResponseStatus.ALL_FLAGS_VALID)
self.assertEqual(response.input_flags, EMPTY_INPUT_FLAGS)
assert response.status == ResponseStatus.ALL_FLAGS_VALID
assert response.input_flags == EMPTY_INPUT_FLAGS
def test_response_initialization_with_flags(self):
def test_response_initializes_with_status_and_input_flags() -> None:
"""Test Response initialization with input flags"""
input_flags = InputFlags([InputFlag('test', input_value='value', status=None)])
response = Response(ResponseStatus.INVALID_VALUE_FLAGS, input_flags)
self.assertEqual(response.status, ResponseStatus.INVALID_VALUE_FLAGS)
self.assertEqual(response.input_flags, input_flags)
assert response.status == ResponseStatus.INVALID_VALUE_FLAGS
assert response.input_flags == input_flags
def test_response_status_types(self):
# ============================================================================
# Tests for Response - status types
# ============================================================================
def test_response_accepts_all_status_types() -> None:
"""Test Response with different status types"""
statuses = [
ResponseStatus.ALL_FLAGS_VALID,
@@ -94,10 +140,5 @@ class TestResponse(unittest.TestCase):
ResponseStatus.UNDEFINED_AND_INVALID_FLAGS
]
for status in statuses:
with self.subTest(status=status):
response = Response(status)
self.assertEqual(response.status, status)
if __name__ == '__main__':
unittest.main()
assert response.status == status
+181 -76
View File
@@ -1,103 +1,208 @@
import re
import unittest
from argenta.command import Command
import pytest
from pytest import CaptureFixture
from argenta.command import Command, InputCommand
from argenta.command.flag import Flag, InputFlag
from argenta.command.flag.flags import Flags, InputFlags
from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.response.entity import Response
from argenta.router import Router
from argenta.router.entity import _structuring_input_flags, _validate_command, _validate_func_args # pyright: ignore[reportPrivateUsage]
from argenta.router.exceptions import (RepeatedFlagNameException,
from argenta.router.entity import _structuring_input_flags, _validate_func_args # pyright: ignore[reportPrivateUsage]
from argenta.router.exceptions import (
RepeatedFlagNameException,
RequiredArgumentNotPassedException,
TriggerContainSpacesException)
TriggerContainSpacesException,
)
class TestRouter(unittest.TestCase):
def test_register_command_with_spaces_in_trigger(self):
with self.assertRaises(TriggerContainSpacesException):
_validate_command(Command(trigger='command with spaces'))
# ============================================================================
# Tests for command validation
# ============================================================================
def test_register_command_with_repeated_flags(self):
with self.assertRaises(RepeatedFlagNameException):
_validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')])))
def test_structuring_input_flags1(self):
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.UNDEFINED)]))
def test_validate_command_raises_error_for_trigger_with_spaces() -> None:
router = Router()
with pytest.raises(TriggerContainSpacesException):
router._validate_command(Command(trigger='command with spaces'))
def test_structuring_input_flags2(self):
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)]))
def test_structuring_input_flags3(self):
cmd = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)]))
def test_validate_command_raises_error_for_repeated_flag_names() -> None:
router = Router()
with pytest.raises(RepeatedFlagNameException):
router._validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')])))
def test_structuring_input_flags4(self):
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)]))
def test_structuring_input_flags5(self):
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)]))
# ============================================================================
# Tests for function argument validation
# ============================================================================
def test_structuring_input_flags6(self):
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)]))
def test_structuring_input_flags7(self):
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)]))
def test_structuring_input_flags8(self):
command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3']))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)]))
def test_structuring_input_flags9(self):
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)]))
def test_structuring_input_flags10(self):
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.VALID)]))
def test_validate_incorrect_func_args1(self):
def handler():
def test_validate_func_args_raises_error_for_missing_response_parameter() -> None:
def handler() -> None:
pass
with self.assertRaises(RequiredArgumentNotPassedException):
with pytest.raises(RequiredArgumentNotPassedException):
_validate_func_args(handler) # pyright: ignore[reportArgumentType]
def test_get_router_aliases(self):
router = Router()
@router.command(Command('some', aliases=['test', 'case']))
def handler(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
pass
self.assertListEqual(router.aliases, ['test', 'case'])
def test_get_router_aliases2(self):
router = Router()
@router.command(Command('some', aliases=['test', 'case']))
def handler(response: Response): # pyright: ignore[reportUnusedFunction]
def test_validate_func_args_prints_warning_for_wrong_type_hint(capsys: CaptureFixture[str]) -> None:
class NotResponse:
pass
@router.command(Command('ext', aliases=['more', 'foo']))
def handler2(response: Response): # pyright: ignore[reportUnusedFunction]
pass
self.assertListEqual(router.aliases, ['test', 'case', 'more', 'foo'])
def test_get_router_aliases3(self):
def func(_response: NotResponse) -> None:
pass
_validate_func_args(func)
output = capsys.readouterr()
assert "WARNING" in output.out
def test_validate_func_args_accepts_missing_type_hint(capsys: CaptureFixture[str]) -> None:
def func(response) -> None: # pyright: ignore[reportMissingParameterType, reportUnknownParameterType]
pass
_validate_func_args(func) # pyright: ignore[reportUnknownArgumentType]
output = capsys.readouterr()
assert output.out == ''
# ============================================================================
# Tests for input flag structuring - undefined flags
# ============================================================================
def test_structuring_input_flags_marks_unregistered_flag_as_undefined() -> None:
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags_marks_unregistered_flag_with_value_as_undefined() -> None:
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags_marks_flag_undefined_when_different_flag_registered() -> None:
cmd = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)])
# ============================================================================
# Tests for input flag structuring - invalid flags
# ============================================================================
def test_structuring_input_flags_marks_flag_invalid_when_value_provided_for_neither() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)])
def test_structuring_input_flags_marks_flag_invalid_when_value_not_matching_regex() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)])
def test_structuring_input_flags_marks_flag_invalid_when_value_not_in_list() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)])
# ============================================================================
# Tests for input flag structuring - valid flags
# ============================================================================
def test_structuring_input_flags_marks_registered_flag_as_valid() -> None:
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_value_in_list() -> None:
command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3']))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_value_matches_regex() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_empty_value_for_neither() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.VALID)])
# ============================================================================
# Tests for router aliases
# ============================================================================
def test_router_aliases_returns_command_aliases() -> None:
router = Router()
@router.command(Command('some', aliases={'test', 'case'}))
def handler(_response: Response) -> None:
pass
assert router.aliases == {'test', 'case'}
def test_router_aliases_returns_combined_aliases_from_multiple_commands() -> None:
router = Router()
@router.command(Command('some', aliases={'test', 'case'}))
def handler(_response: Response) -> None:
pass
@router.command(Command('ext', aliases={'more', 'foo'}))
def handler2(_response: Response) -> None:
pass
assert router.aliases == {'test', 'case', 'more', 'foo'}
def test_router_aliases_returns_empty_set_when_no_aliases() -> None:
router = Router()
@router.command(Command('some'))
def handler(response: Response): # pyright: ignore[reportUnusedFunction]
def handler(_response: Response) -> None:
pass
self.assertListEqual(router.aliases, [])
assert router.aliases == set()
# ============================================================================
# Tests for handler finding and execution
# ============================================================================
def test_finds_appropriate_handler_executes_handler_by_alias(capsys: CaptureFixture[str]) -> None:
router = Router()
@router.command(Command('hello', aliases={'hi'}))
def handler(_res: Response) -> None:
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out
def test_finds_appropriate_handler_executes_handler_with_flags_by_alias(capsys: CaptureFixture[str]) -> None:
router = Router()
@router.command(Command('hello', flags=Flag('flag'), aliases={'hi'}))
def handler(_res: Response) -> None:
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out