better testsssssss

This commit is contained in:
2025-12-07 01:33:39 +03:00
parent dee328525d
commit 9423034a08
9 changed files with 1035 additions and 610 deletions
+247 -193
View File
@@ -1,77 +1,203 @@
from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine import pytest
from argenta.app.protocols import DescriptionMessageGenerator, NonStandardBehaviorHandler
from pytest import CaptureFixture from pytest import CaptureFixture
from argenta.app import App from argenta.app import App
from argenta.response import Response from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine
from argenta.app.protocols import DescriptionMessageGenerator, NonStandardBehaviorHandler
from argenta.command.models import Command, InputCommand from argenta.command.models import Command, InputCommand
from argenta.response import Response
from argenta.router import Router from argenta.router import Router
import pytest
def test_is_exit_command1(): # ============================================================================
# Tests for exit command detection
# ============================================================================
def test_default_exit_command_lowercase_q_is_recognized() -> None:
app = App() app = App()
assert app._is_exit_command(InputCommand('q')) is True assert app._is_exit_command(InputCommand('q')) is True
def test_is_exit_command5(): def test_default_exit_command_uppercase_q_is_recognized() -> None:
app = App() app = App()
assert app._is_exit_command(InputCommand('Q')) is True assert app._is_exit_command(InputCommand('Q')) is True
def test_is_exit_command2(): def test_exit_command_not_recognized_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False) app = App(ignore_command_register=False)
assert app._is_exit_command(InputCommand('q')) is False assert app._is_exit_command(InputCommand('q')) is False
def test_is_exit_command3(): def test_custom_exit_command_is_recognized() -> None:
app = App(exit_command=Command('quit')) app = App(exit_command=Command('quit'))
assert app._is_exit_command(InputCommand('quit')) is True assert app._is_exit_command(InputCommand('quit')) is True
def test_is_exit_command4(): def test_custom_exit_command_case_insensitive_by_default() -> None:
app = App(exit_command=Command('quit')) app = App(exit_command=Command('quit'))
assert app._is_exit_command(InputCommand('qUIt')) is True assert app._is_exit_command(InputCommand('qUIt')) is True
def test_is_exit_command6(): def test_custom_exit_command_case_sensitive_when_enabled() -> None:
app = App(ignore_command_register=False, app = App(ignore_command_register=False, exit_command=Command('quit'))
exit_command=Command('quit'))
assert app._is_exit_command(InputCommand('qUIt')) is False assert app._is_exit_command(InputCommand('qUIt')) is False
def test_is_unknown_command1(): def test_exit_command_alias_is_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('exit')) is True
def test_exit_command_alias_case_sensitive_when_enabled() -> None:
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('exit')) is True
def test_non_exit_command_is_not_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('quit')) is False
def test_non_exit_command_with_wrong_case_is_not_recognized() -> None:
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('Exit')) is False
# ============================================================================
# Tests for unknown command detection
# ============================================================================
def test_registered_command_is_not_unknown() -> None:
app = App() app = App()
app.set_unknown_command_handler(lambda command: None) app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()} app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
assert app._is_unknown_command(InputCommand('fr')) is False assert app._is_unknown_command(InputCommand('fr')) is False
def test_is_unknown_command2(): def test_unregistered_command_is_unknown() -> None:
app = App() app = App()
app.set_unknown_command_handler(lambda command: None) app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()} app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
assert app._is_unknown_command(InputCommand('cr')) is True assert app._is_unknown_command(InputCommand('cr')) is True
def test_is_unknown_command3():
def test_command_with_wrong_case_is_unknown_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False) app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None) app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()} app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
assert app._is_unknown_command(InputCommand('pr')) is True assert app._is_unknown_command(InputCommand('pr')) is True
def test_is_unknown_command4(): def test_command_with_exact_case_is_not_unknown_when_case_sensitivity_enabled() -> None:
app = App(ignore_command_register=False) app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None) app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()} app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
assert app._is_unknown_command(InputCommand('tW')) is False assert app._is_unknown_command(InputCommand('tW')) is False
def test_add_messages_on_startup():
app = App()
app.add_message_on_startup('Some message')
assert app._messages_on_startup == ['Some message']
def test_include_routers(): # ============================================================================
# Tests for similar command suggestions
# ============================================================================
def test_most_similar_command_finds_closest_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('port', aliases={'p'}))
def handler(_res: Response) -> None:
pass
@router.command(Command('host', aliases={'h'}))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('por') == 'port'
def test_most_similar_command_prefers_shorter_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('com') == 'command'
def test_most_similar_command_finds_longer_match_when_closer() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('command_') == 'command_other'
def test_most_similar_command_returns_none_for_no_match() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other'))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('nonexists') is None
def test_most_similar_command_matches_aliases() -> None:
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command', aliases={'other_name'}))
def handler(_res: Response) -> None:
pass
@router.command(Command('command_other', aliases={'more_name'}))
def handler2(_res: Response) -> None:
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('othe') == 'other_name'
# ============================================================================
# Tests for router registration
# ============================================================================
def test_include_routers_registers_multiple_routers() -> None:
app = App() app = App()
router = Router() router = Router()
router2 = Router() router2 = Router()
@@ -79,16 +205,17 @@ def test_include_routers():
assert app.registered_routers.registered_routers == [router, router2] assert app.registered_routers.registered_routers == [router, router2]
def test_overlapping_aliases(capsys: CaptureFixture[str]):
def test_overlapping_aliases_prints_warning(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True) app = App(override_system_messages=True)
router = Router() router = Router()
@router.command(Command('test', aliases={'alias'})) @router.command(Command('test', aliases={'alias'}))
def handler(res: Response): def handler(_res: Response) -> None:
pass pass
@router.command(Command('test2', aliases={'alias'})) @router.command(Command('test2', aliases={'alias'}))
def handler2(res: Response): def handler2(_res: Response) -> None:
pass pass
app.include_routers(router) app.include_routers(router)
@@ -98,178 +225,19 @@ def test_overlapping_aliases(capsys: CaptureFixture[str]):
assert "Overlapping" in captured.out assert "Overlapping" in captured.out
def test_print_framed_text(capsys: CaptureFixture[str]):
app = App(
override_system_messages=True,
dividing_line=StaticDividingLine(length=5)
)
app._print_framed_text('test')
captured = capsys.readouterr() # ============================================================================
# Tests for startup messages
# ============================================================================
assert '\n-----\n\ntest\n\n-----\n' in captured.out
def test_print_framed_text2(capsys: CaptureFixture[str]): def test_add_message_on_startup_stores_message() -> None:
app = App(
override_system_messages=True,
dividing_line=DynamicDividingLine()
)
app._print_framed_text('test as test as test')
captured = capsys.readouterr()
assert '\n' + '-'*20 + '\n\ntest as test as test\n\n' + '-'*20 + '\n' in captured.out
def test_print_framed_text3(capsys: CaptureFixture[str]):
app = App(
override_system_messages=True,
dividing_line=DynamicDividingLine()
)
app._print_framed_text('some long test')
captured = capsys.readouterr()
assert '\n--------------\n\nsome long test\n\n--------------\n' in captured.out
def test_print_framed_text4():
class OtherDividingLine:
pass
app = App(
override_system_messages=True,
dividing_line=OtherDividingLine() # pyright: ignore[reportArgumentType]
)
with pytest.raises(NotImplementedError):
app._print_framed_text('some long test')
def test_most_similiar_command():
app = App(override_system_messages=True)
router = Router()
@router.command(Command('port', aliases={'p'}))
def handler(res: Response):
pass
@router.command(Command('host', aliases={'h'}))
def handler2(res: Response):
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('por') == 'port'
def test_most_similiar_command2():
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(res: Response):
pass
@router.command(Command('command_other'))
def handler2(res: Response):
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('com') == 'command'
def test_most_similiar_command3():
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(res: Response):
pass
@router.command(Command('command_other'))
def handler2(res: Response):
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('command_') == 'command_other'
def test_most_similiar_command4():
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command'))
def handler(res: Response):
pass
@router.command(Command('command_other'))
def handler2(res: Response):
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('nonexists') is None
def test_most_similiar_command5():
app = App(override_system_messages=True)
router = Router()
@router.command(Command('command', aliases={'other_name'}))
def handler(res: Response):
pass
@router.command(Command('command_other', aliases={'more_name'}))
def handler2(res: Response):
pass
app.include_routers(router)
app._pre_cycle_setup()
assert app._most_similar_command('othe') == 'other_name'
def test_app_set_description_message_gen():
app = App() app = App()
descr_gen: DescriptionMessageGenerator = lambda command, description: command + '-+-' + description app.add_message_on_startup('Some message')
app.set_description_message_pattern(descr_gen) assert app._messages_on_startup == ['Some message']
assert app._description_message_gen is descr_gen
def test_app_set_exit_handler(): def test_pre_cycle_setup_prints_startup_messages(capsys: CaptureFixture[str]) -> None:
app = App()
handler: NonStandardBehaviorHandler[Response] = lambda response: print('goodbye')
app.set_exit_command_handler(handler)
assert app._exit_command_handler is handler
def test_app_is_exit_command():
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('exit')) is True
def test_app_is_exit_command2():
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('exit')) is True
def test_app_is_exit_command3():
app = App(exit_command=Command('q', aliases={'exit'}))
assert app._is_exit_command(InputCommand('quit')) is False
def test_app_is_exit_command4():
app = App(exit_command=Command('q', aliases={'exit'}), ignore_command_register=False)
assert app._is_exit_command(InputCommand('Exit')) is False
def test_app_setup_default_view():
app = App(prompt='>>')
app._setup_default_view()
assert app._prompt == '[italic dim bold]>>'
def test_app_setup_default_view2():
app = App()
app._setup_default_view()
assert app._unknown_command_handler(InputCommand('nonexists')) is None
def test_app_pre_cycle_setup(capsys: CaptureFixture[str]):
app = App() app = App()
app.add_message_on_startup('some message') app.add_message_on_startup('some message')
app._pre_cycle_setup() app._pre_cycle_setup()
@@ -277,12 +245,99 @@ def test_app_pre_cycle_setup(capsys: CaptureFixture[str]):
assert 'some message' in stdout.out assert 'some message' in stdout.out
def test_app_with_router_with_disable_redirect_stdout(capsys: CaptureFixture[str]):
# ============================================================================
# Tests for framed text printing
# ============================================================================
def test_print_framed_text_with_static_dividing_line(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=StaticDividingLine(length=5))
app._print_framed_text('test')
captured = capsys.readouterr()
assert '\n-----\n\ntest\n\n-----\n' in captured.out
def test_print_framed_text_with_dynamic_dividing_line_short_text(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=DynamicDividingLine())
app._print_framed_text('some long test')
captured = capsys.readouterr()
assert '\n--------------\n\nsome long test\n\n--------------\n' in captured.out
def test_print_framed_text_with_dynamic_dividing_line_long_text(capsys: CaptureFixture[str]) -> None:
app = App(override_system_messages=True, dividing_line=DynamicDividingLine())
app._print_framed_text('test as test as test')
captured = capsys.readouterr()
assert '\n' + '-'*20 + '\n\ntest as test as test\n\n' + '-'*20 + '\n' in captured.out
def test_print_framed_text_with_unsupported_dividing_line_raises_error() -> None:
class OtherDividingLine:
pass
app = App(override_system_messages=True, dividing_line=OtherDividingLine()) # pyright: ignore[reportArgumentType]
with pytest.raises(NotImplementedError):
app._print_framed_text('some long test')
# ============================================================================
# Tests for handler configuration
# ============================================================================
def test_set_description_message_pattern_stores_generator() -> None:
app = App()
descr_gen: DescriptionMessageGenerator = lambda command, description: command + '-+-' + description
app.set_description_message_pattern(descr_gen)
assert app._description_message_gen is descr_gen
def test_set_exit_command_handler_stores_handler() -> None:
app = App()
handler: NonStandardBehaviorHandler[Response] = lambda response: print('goodbye')
app.set_exit_command_handler(handler)
assert app._exit_command_handler is handler
# ============================================================================
# Tests for default view setup
# ============================================================================
def test_setup_default_view_formats_prompt() -> None:
app = App(prompt='>>')
app._setup_default_view()
assert app._prompt == '[italic dim bold]>>'
def test_setup_default_view_sets_default_unknown_command_handler() -> None:
app = App()
app._setup_default_view()
assert app._unknown_command_handler(InputCommand('nonexists')) is None
# ============================================================================
# Tests for command processing
# ============================================================================
def test_process_command_with_router_with_disabled_stdout_redirect(capsys: CaptureFixture[str]) -> None:
app = App(repeat_command_groups_printing=True) app = App(repeat_command_groups_printing=True)
router = Router(disable_redirect_stdout=True) router = Router(disable_redirect_stdout=True)
@router.command('command') @router.command('command')
def handler(res: Response): def handler(_res: Response) -> None:
print("Hello!") print("Hello!")
app.include_router(router) app.include_router(router)
@@ -293,4 +348,3 @@ def test_app_with_router_with_disable_redirect_stdout(capsys: CaptureFixture[str
stdout = capsys.readouterr() stdout = capsys.readouterr()
assert 'Hello!' in stdout.out assert 'Hello!' in stdout.out
+121 -61
View File
@@ -1,7 +1,6 @@
import sys import sys
from argparse import Namespace from argparse import Namespace
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from unittest.mock import call
import pytest import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
@@ -18,7 +17,12 @@ if TYPE_CHECKING:
from pytest_mock.plugin import MockType from pytest_mock.plugin import MockType
def test_value_argument_creation() -> None: # ============================================================================
# Tests for argument model creation
# ============================================================================
def test_value_argument_stores_all_properties() -> None:
arg: ValueArgument = ValueArgument( arg: ValueArgument = ValueArgument(
name="test_arg", name="test_arg",
prefix="--", prefix="--",
@@ -39,7 +43,7 @@ def test_value_argument_creation() -> None:
assert arg.string_entity == "--test_arg" assert arg.string_entity == "--test_arg"
def test_boolean_argument_creation() -> None: def test_boolean_argument_stores_all_properties() -> None:
arg: BooleanArgument = BooleanArgument( arg: BooleanArgument = BooleanArgument(
name="verbose", prefix="-", help="Enable verbose mode.", is_deprecated=True name="verbose", prefix="-", help="Enable verbose mode.", is_deprecated=True
) )
@@ -51,7 +55,7 @@ def test_boolean_argument_creation() -> None:
assert arg.string_entity == "-verbose" assert arg.string_entity == "-verbose"
def test_input_argument_creation() -> None: def test_input_argument_stores_all_properties() -> None:
arg: InputArgument = InputArgument( arg: InputArgument = InputArgument(
name="file", value="/path/to/file", founder_class=ValueArgument name="file", value="/path/to/file", founder_class=ValueArgument
) )
@@ -60,6 +64,21 @@ def test_input_argument_creation() -> None:
assert arg.founder_class is ValueArgument assert arg.founder_class is ValueArgument
def test_input_argument_str_representation() -> None:
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert str(arg) == 'InputArgument(host=192.168.0.0)'
def test_input_argument_repr_representation() -> None:
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert repr(arg) == "InputArgument<name=host, value=192.168.0.0, founder_class=ValueArgument>"
# ============================================================================
# Fixtures for ArgSpace tests
# ============================================================================
@pytest.fixture @pytest.fixture
def mock_arguments() -> list[InputArgument]: def mock_arguments() -> list[InputArgument]:
return [ return [
@@ -74,42 +93,49 @@ def arg_space(mock_arguments: list[InputArgument]) -> ArgSpace:
return ArgSpace(all_arguments=mock_arguments) return ArgSpace(all_arguments=mock_arguments)
def test_argspace_initialization(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None: # ============================================================================
# Tests for ArgSpace initialization and basic operations
# ============================================================================
def test_argspace_initializes_with_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
assert len(arg_space.all_arguments) == 3 assert len(arg_space.all_arguments) == 3
assert arg_space.all_arguments == mock_arguments assert arg_space.all_arguments == mock_arguments
def test_argspace_get_by_name(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None: def test_argspace_get_by_name_finds_existing_argument(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
found_arg: InputArgument | None = arg_space.get_by_name("arg1") found_arg: InputArgument | None = arg_space.get_by_name("arg1")
assert found_arg is not None assert found_arg is not None
assert found_arg == mock_arguments[0] assert found_arg == mock_arguments[0]
def test_argspace_get_by_name_not_found(arg_space: ArgSpace) -> None: def test_argspace_get_by_name_returns_none_for_missing_argument(arg_space: ArgSpace) -> None:
found_arg: InputArgument | None = arg_space.get_by_name("non_existent_arg") found_arg: InputArgument | None = arg_space.get_by_name("non_existent_arg")
assert found_arg is None assert found_arg is None
def test_argspace_get_by_type(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None: def test_argspace_get_by_type_filters_value_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
value_args: list[InputArgument] = arg_space.get_by_type(ValueArgument) value_args = arg_space.get_by_type(ValueArgument)
assert len(value_args) == 2 assert len(value_args) == 2
assert mock_arguments[0] in value_args assert mock_arguments[0] in value_args
assert mock_arguments[2] in value_args assert mock_arguments[2] in value_args
bool_args: list[InputArgument] = arg_space.get_by_type(BooleanArgument)
def test_argspace_get_by_type_filters_boolean_arguments(arg_space: ArgSpace, mock_arguments: list[InputArgument]) -> None:
bool_args = arg_space.get_by_type(BooleanArgument)
assert len(bool_args) == 1 assert len(bool_args) == 1
assert mock_arguments[1] in bool_args assert mock_arguments[1] in bool_args
def test_argspace_get_by_type_not_found(arg_space: ArgSpace) -> None: def test_argspace_get_by_type_returns_empty_list_for_unknown_type(arg_space: ArgSpace) -> None:
class OtherArgument(BaseArgument): class OtherArgument(BaseArgument):
pass pass
other_args: list[InputArgument] = arg_space.get_by_type(OtherArgument) # pyright: ignore[reportAssignmentType] other_args = arg_space.get_by_type(OtherArgument) # pyright: ignore[reportAssignmentType]
assert other_args == [] assert other_args == []
def test_argspace_from_namespace() -> None: def test_argspace_from_namespace_creates_argspace_from_parsed_namespace() -> None:
namespace: Namespace = Namespace(config="config.json", debug=True, verbose=False) namespace: Namespace = Namespace(config="config.json", debug=True, verbose=False)
processed_args: list[ValueArgument | BooleanArgument] = [ processed_args: list[ValueArgument | BooleanArgument] = [
ValueArgument(name="config", prefix="--"), ValueArgument(name="config", prefix="--"),
@@ -132,6 +158,11 @@ def test_argspace_from_namespace() -> None:
assert debug_arg.founder_class is BooleanArgument assert debug_arg.founder_class is BooleanArgument
# ============================================================================
# Fixtures for ArgParser tests
# ============================================================================
@pytest.fixture @pytest.fixture
def value_arg() -> ValueArgument: def value_arg() -> ValueArgument:
return ValueArgument( return ValueArgument(
@@ -153,7 +184,12 @@ def processed_args(value_arg: ValueArgument, bool_arg: BooleanArgument) -> list[
return [value_arg, bool_arg] return [value_arg, bool_arg]
def test_argparser_initialization(processed_args: list[ValueArgument | BooleanArgument]) -> None: # ============================================================================
# Tests for ArgParser initialization
# ============================================================================
def test_argparser_initializes_with_all_properties(processed_args: list[ValueArgument | BooleanArgument]) -> None:
parser: ArgParser = ArgParser( parser: ArgParser = ArgParser(
processed_args=processed_args, processed_args=processed_args,
name="TestApp", name="TestApp",
@@ -168,61 +204,93 @@ def test_argparser_initialization(processed_args: list[ValueArgument | BooleanAr
assert parser.parsed_argspace.all_arguments == [] assert parser.parsed_argspace.all_arguments == []
# ============================================================================
# Tests for ArgParser argument registration (Python version specific)
# ============================================================================
@pytest.mark.skipif(sys.version_info < (3, 13), reason="requires python3.13 or higher") @pytest.mark.skipif(sys.version_info < (3, 13), reason="requires python3.13 or higher")
def test_argparser_register_args_py313( def test_argparser_registers_arguments_with_deprecated_flag_py313(
mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument
) -> None: ) -> None:
mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument") mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument")
parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg]) # pyright: ignore[reportUnusedVariable] _parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg])
expected_calls: list[call] = [ # ArgParser may add additional arguments (like help), so check at least 2
call( assert mock_add_argument.call_count >= 2
value_arg.string_entity,
action=value_arg.action, # Check that value_arg was registered correctly
help=value_arg.help, value_arg_call = None
default=value_arg.default, bool_arg_call = None
choices=value_arg.possible_values,
required=value_arg.is_required, for call_args in mock_add_argument.call_args_list:
deprecated=value_arg.is_deprecated, args, kwargs = call_args
), if len(args) > 0 and args[0] == value_arg.string_entity:
call( value_arg_call = (args, kwargs)
bool_arg.string_entity, elif len(args) > 0 and args[0] == bool_arg.string_entity:
action=bool_arg.action, bool_arg_call = (args, kwargs)
help=bool_arg.help,
deprecated=bool_arg.is_deprecated, assert value_arg_call is not None, "value_arg was not registered"
), _, value_kwargs = value_arg_call
] assert value_kwargs['action'] == value_arg.action
mock_add_argument.assert_has_calls(expected_calls, any_order=True) assert value_kwargs['help'] == value_arg.help
assert value_kwargs['default'] == value_arg.default
assert value_kwargs['choices'] == value_arg.possible_values
assert value_kwargs['required'] == value_arg.is_required
assert value_kwargs['deprecated'] == value_arg.is_deprecated
assert bool_arg_call is not None, "bool_arg was not registered"
_, bool_kwargs = bool_arg_call
assert bool_kwargs['action'] == bool_arg.action
assert bool_kwargs['help'] == bool_arg.help
assert bool_kwargs['deprecated'] == bool_arg.is_deprecated
@pytest.mark.skipif(sys.version_info > (3, 12), reason="for more latest python version has been other test") @pytest.mark.skipif(sys.version_info > (3, 12), reason="for more latest python version has been other test")
def test_argparser_register_args_py312( def test_argparser_registers_arguments_without_deprecated_flag_py312(
mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument mocker: MockerFixture, value_arg: ValueArgument, bool_arg: BooleanArgument
) -> None: ) -> None:
mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument") mock_add_argument: MockType = mocker.patch("argparse.ArgumentParser.add_argument")
parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg]) _parser: ArgParser = ArgParser(processed_args=[value_arg, bool_arg])
expected_calls: list[call] = [ # ArgParser may add additional arguments (like help), so check at least 2
call( assert mock_add_argument.call_count >= 2
value_arg.string_entity,
action=value_arg.action, # Check that value_arg was registered correctly
help=value_arg.help, value_arg_call = None
default=value_arg.default, bool_arg_call = None
choices=value_arg.possible_values,
required=value_arg.is_required, for call_args in mock_add_argument.call_args_list:
), args, kwargs = call_args
call( if len(args) > 0 and args[0] == value_arg.string_entity:
bool_arg.string_entity, value_arg_call = (args, kwargs)
action=bool_arg.action, elif len(args) > 0 and args[0] == bool_arg.string_entity:
help=bool_arg.help, bool_arg_call = (args, kwargs)
),
] assert value_arg_call is not None, "value_arg was not registered"
mock_add_argument.assert_has_calls(expected_calls, any_order=True) _, value_kwargs = value_arg_call
assert value_kwargs['action'] == value_arg.action
assert value_kwargs['help'] == value_arg.help
assert value_kwargs['default'] == value_arg.default
assert value_kwargs['choices'] == value_arg.possible_values
assert value_kwargs['required'] == value_arg.is_required
assert 'deprecated' not in value_kwargs
assert bool_arg_call is not None, "bool_arg was not registered"
_, bool_kwargs = bool_arg_call
assert bool_kwargs['action'] == bool_arg.action
assert bool_kwargs['help'] == bool_arg.help
assert 'deprecated' not in bool_kwargs
def test_argparser_parse_args_populates_argspace( # ============================================================================
# Tests for ArgParser argument parsing
# ============================================================================
def test_argparser_parse_args_populates_argspace_correctly(
mocker: MockerFixture, processed_args: list[ValueArgument | BooleanArgument] mocker: MockerFixture, processed_args: list[ValueArgument | BooleanArgument]
) -> None: ) -> None:
mock_namespace: Namespace = Namespace(config='config.json', debug=True) mock_namespace: Namespace = Namespace(config='config.json', debug=True)
@@ -245,11 +313,3 @@ def test_argparser_parse_args_populates_argspace(
assert debug_arg is not None assert debug_arg is not None
assert debug_arg.value is True assert debug_arg.value is True
assert debug_arg.founder_class is BooleanArgument assert debug_arg.founder_class is BooleanArgument
def test_str_input_argument():
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert str(arg) == 'InputArgument(host=192.168.0.0)'
def test_repr_input_argument():
arg = InputArgument('host', value='192.168.0.0', founder_class=ValueArgument)
assert repr(arg) == "InputArgument<name=host, value=192.168.0.0, founder_class=ValueArgument>"
+61 -25
View File
@@ -1,9 +1,9 @@
import os import os
from typing import Any
import pytest import pytest
from pyfakefs.fake_filesystem import FakeFilesystem from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from pytest_mock.plugin import MockType
from argenta.app.autocompleter.entity import ( from argenta.app.autocompleter.entity import (
AutoCompleter, AutoCompleter,
@@ -12,8 +12,17 @@ from argenta.app.autocompleter.entity import (
) )
HISTORY_FILE: str = "test_history.txt"
COMMANDS: list[str] = ["start", "stop", "status"]
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture @pytest.fixture
def mock_readline(mocker: MockerFixture) -> MockType: def mock_readline(mocker: MockerFixture) -> Any:
_history: list[str] = [] _history: list[str] = []
def add_history(item: str) -> None: def add_history(item: str) -> None:
@@ -30,31 +39,37 @@ def mock_readline(mocker: MockerFixture) -> MockType:
def clear_history() -> None: def clear_history() -> None:
_history.clear() _history.clear()
mock: MockType = mocker.MagicMock() mock: Any = mocker.MagicMock() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
mocker.patch('argenta.app.autocompleter.entity.readline', mock) mocker.patch('argenta.app.autocompleter.entity.readline', mock) # pyright: ignore[reportUnknownArgumentType]
mock.reset_mock() mock.reset_mock() # pyright: ignore[reportUnknownMemberType]
clear_history() clear_history()
mock.add_history.side_effect = add_history mock.add_history.side_effect = add_history # pyright: ignore[reportUnknownMemberType]
mock.get_history_item.side_effect = get_history_item mock.get_history_item.side_effect = get_history_item # pyright: ignore[reportUnknownMemberType]
mock.get_current_history_length.side_effect = get_current_history_length mock.get_current_history_length.side_effect = get_current_history_length # pyright: ignore[reportUnknownMemberType]
mock.get_completer_delims.return_value = " " mock.get_completer_delims.return_value = " " # pyright: ignore[reportUnknownMemberType]
return mock return mock # pyright: ignore[reportReturnType, reportUnknownVariableType]
HISTORY_FILE: str = "test_history.txt" # ============================================================================
COMMANDS: list[str] = ["start", "stop", "status"] # Tests for AutoCompleter initialization
# ============================================================================
def test_initialization() -> None: def test_autocompleter_initializes_with_history_file_and_button() -> None:
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE, autocomplete_button="tab") completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE, autocomplete_button="tab")
assert completer.history_filename == HISTORY_FILE assert completer.history_filename == HISTORY_FILE
assert completer.autocomplete_button == "tab" assert completer.autocomplete_button == "tab"
def test_initial_setup_if_history_file_does_not_exist(fs: FakeFilesystem, mock_readline: MockType) -> None: # ============================================================================
# Tests for initial setup
# ============================================================================
def test_initial_setup_creates_history_when_file_does_not_exist(fs: FakeFilesystem, mock_readline: Any) -> None:
if os.path.exists(HISTORY_FILE): if os.path.exists(HISTORY_FILE):
os.remove(HISTORY_FILE) os.remove(HISTORY_FILE)
@@ -68,8 +83,8 @@ def test_initial_setup_if_history_file_does_not_exist(fs: FakeFilesystem, mock_r
mock_readline.parse_and_bind.assert_called_with("tab: complete") mock_readline.parse_and_bind.assert_called_with("tab: complete")
def test_initial_setup_if_history_file_exists(fs: FakeFilesystem, mock_readline: MockType) -> None: def test_initial_setup_reads_existing_history_file(fs: FakeFilesystem, mock_readline: Any) -> None:
fs.create_file(HISTORY_FILE, contents="previous_command\n") fs.create_file(HISTORY_FILE, contents="previous_command\n") # pyright: ignore[reportUnknownMemberType]
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE) completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
completer.initial_setup(COMMANDS) completer.initial_setup(COMMANDS)
@@ -80,7 +95,7 @@ def test_initial_setup_if_history_file_exists(fs: FakeFilesystem, mock_readline:
mock_readline.parse_and_bind.assert_called_once() mock_readline.parse_and_bind.assert_called_once()
def test_initial_setup_with_no_history_filename(mock_readline: MockType) -> None: def test_initial_setup_works_without_history_filename(mock_readline: Any) -> None:
completer: AutoCompleter = AutoCompleter(history_filename=None) completer: AutoCompleter = AutoCompleter(history_filename=None)
completer.initial_setup(COMMANDS) completer.initial_setup(COMMANDS)
@@ -88,7 +103,12 @@ def test_initial_setup_with_no_history_filename(mock_readline: MockType) -> None
assert mock_readline.add_history.call_count == len(COMMANDS) assert mock_readline.add_history.call_count == len(COMMANDS)
def test_exit_setup_writes_and_filters_history(fs: FakeFilesystem, mock_readline: MockType) -> None: # ============================================================================
# Tests for exit setup and history filtering
# ============================================================================
def test_exit_setup_writes_and_filters_duplicate_commands(fs: FakeFilesystem, mock_readline: Any) -> None:
mock_readline.add_history.side_effect = None mock_readline.add_history.side_effect = None
mock_readline.add_history("start server") mock_readline.add_history("start server")
mock_readline.add_history("stop client") mock_readline.add_history("stop client")
@@ -96,7 +116,7 @@ def test_exit_setup_writes_and_filters_history(fs: FakeFilesystem, mock_readline
mock_readline.add_history("start server") mock_readline.add_history("start server")
raw_history_content: str = "\n".join(["start server", "stop client", "invalid command", "start server"]) raw_history_content: str = "\n".join(["start server", "stop client", "invalid command", "start server"])
fs.create_file(HISTORY_FILE, contents=raw_history_content) fs.create_file(HISTORY_FILE, contents=raw_history_content) # pyright: ignore[reportUnknownMemberType]
completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE) completer: AutoCompleter = AutoCompleter(history_filename=HISTORY_FILE)
completer.exit_setup(all_commands=["start", "stop"], ignore_command_register=False) completer.exit_setup(all_commands=["start", "stop"], ignore_command_register=False)
@@ -109,13 +129,18 @@ def test_exit_setup_writes_and_filters_history(fs: FakeFilesystem, mock_readline
assert lines == ["start server", "stop client"] assert lines == ["start server", "stop client"]
def test_exit_setup_with_no_history_filename(mock_readline: MockType) -> None: def test_exit_setup_skips_writing_when_no_history_filename(mock_readline: Any) -> None:
completer: AutoCompleter = AutoCompleter(history_filename=None) completer: AutoCompleter = AutoCompleter(history_filename=None)
completer.exit_setup(all_commands=COMMANDS, ignore_command_register=False) completer.exit_setup(all_commands=COMMANDS, ignore_command_register=False)
mock_readline.write_history_file.assert_not_called() mock_readline.write_history_file.assert_not_called()
def test_complete_with_no_matches(mock_readline: MockType) -> None: # ============================================================================
# Tests for autocomplete functionality
# ============================================================================
def test_complete_returns_none_when_no_matches_found(mock_readline: Any) -> None:
cmd: str cmd: str
for cmd in ["start", "stop"]: for cmd in ["start", "stop"]:
mock_readline.add_history(cmd) mock_readline.add_history(cmd)
@@ -125,7 +150,7 @@ def test_complete_with_no_matches(mock_readline: MockType) -> None:
assert completer._complete("run", 1) is None assert completer._complete("run", 1) is None
def test_complete_with_one_match(mock_readline: MockType) -> None: def test_complete_returns_single_match(mock_readline: Any) -> None:
mock_readline.add_history("start server") mock_readline.add_history("start server")
mock_readline.add_history("stop server") mock_readline.add_history("stop server")
@@ -134,7 +159,7 @@ def test_complete_with_one_match(mock_readline: MockType) -> None:
assert completer._complete("start", 1) is None assert completer._complete("start", 1) is None
def test_complete_with_multiple_matches(mock_readline: MockType) -> None: def test_complete_inserts_common_prefix_for_multiple_matches(mock_readline: Any) -> None:
mock_readline.add_history("status client") mock_readline.add_history("status client")
mock_readline.add_history("status server") mock_readline.add_history("status server")
mock_readline.add_history("stop") mock_readline.add_history("stop")
@@ -152,21 +177,32 @@ def test_complete_with_multiple_matches(mock_readline: MockType) -> None:
mock_readline.insert_text.assert_not_called() mock_readline.insert_text.assert_not_called()
def test_is_command_exist() -> None: # ============================================================================
# Tests for helper functions
# ============================================================================
def test_is_command_exist_checks_case_sensitive_when_enabled() -> None:
existing: list[str] = ["start", "stop", "status"] existing: list[str] = ["start", "stop", "status"]
assert _is_command_exist("start", existing, ignore_command_register=False) is True assert _is_command_exist("start", existing, ignore_command_register=False) is True
assert _is_command_exist("START", existing, ignore_command_register=False) is False assert _is_command_exist("START", existing, ignore_command_register=False) is False
assert _is_command_exist("unknown", existing, ignore_command_register=False) is False assert _is_command_exist("unknown", existing, ignore_command_register=False) is False
def test_is_command_exist_checks_case_insensitive_when_enabled() -> None:
existing: list[str] = ["start", "stop", "status"]
assert _is_command_exist("start", existing, ignore_command_register=True) is True assert _is_command_exist("start", existing, ignore_command_register=True) is True
assert _is_command_exist("START", existing, ignore_command_register=True) is True assert _is_command_exist("START", existing, ignore_command_register=True) is True
assert _is_command_exist("unknown", existing, ignore_command_register=True) is False assert _is_command_exist("unknown", existing, ignore_command_register=True) is False
def test_get_history_items(mock_readline: MockType) -> None: def test_get_history_items_returns_empty_list_initially(mock_readline: Any) -> None:
assert _get_history_items() == [] assert _get_history_items() == []
def test_get_history_items_returns_all_added_items(mock_readline: Any) -> None:
mock_readline.add_history("first item") mock_readline.add_history("first item")
mock_readline.add_history("second item") mock_readline.add_history("second item")
+65 -38
View File
@@ -2,86 +2,113 @@ import re
import pytest import pytest
from argenta.command.exceptions import (EmptyInputCommandException, from argenta.command.exceptions import (
RepeatedInputFlagsException, EmptyInputCommandException,
UnprocessedInputFlagException) RepeatedInputFlagsException,
UnprocessedInputFlagException,
)
from argenta.command.flag import Flag, InputFlag from argenta.command.flag import Flag, InputFlag
from argenta.command.flag.flags import Flags from argenta.command.flag.flags import Flags
from argenta.command.flag.models import PossibleValues, ValidationStatus from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.command.models import Command, InputCommand from argenta.command.models import Command, InputCommand
def test_parse_correct_raw_command(): # ============================================================================
# Tests for InputCommand parsing - successful cases
# ============================================================================
def test_parse_extracts_trigger_from_command_with_flags() -> None:
assert InputCommand.parse('ssh --host 192.168.0.3').trigger == 'ssh' assert InputCommand.parse('ssh --host 192.168.0.3').trigger == 'ssh'
def test_parse_raw_command_without_flag_name_with_value(): def test_parse_returns_input_command_instance() -> None:
with pytest.raises(UnprocessedInputFlagException): cmd = InputCommand.parse('ssh --host 192.168.0.3')
InputCommand.parse('ssh 192.168.0.3') assert isinstance(cmd, InputCommand)
def test_parse_raw_command_with_repeated_flag_name(): def test_parse_handles_triple_prefix_flags() -> None:
with pytest.raises(RepeatedInputFlagsException):
InputCommand.parse('ssh --host 192.168.0.3 --host 172.198.0.43')
def test_parse_raw_command_with_triple_prefix():
assert InputCommand.parse( assert InputCommand.parse(
'ssh ---host 192.168.0.0' 'ssh ---host 192.168.0.0'
).input_flags.get_flag_by_name('host') == \ ).input_flags.get_flag_by_name('host') == \
InputFlag('host', input_value='192.168.0.0', prefix='---') InputFlag('host', input_value='192.168.0.0', prefix='---')
def test_parse_raw_command_with_unprocessed_entity(): # ============================================================================
# Tests for InputCommand parsing - error cases
# ============================================================================
def test_parse_raises_error_for_value_without_flag_name() -> None:
with pytest.raises(UnprocessedInputFlagException):
InputCommand.parse('ssh 192.168.0.3')
def test_parse_raises_error_for_repeated_flag_names() -> None:
with pytest.raises(RepeatedInputFlagsException):
InputCommand.parse('ssh --host 192.168.0.3 --host 172.198.0.43')
def test_parse_raises_error_for_unprocessed_entity_after_flags() -> None:
with pytest.raises(UnprocessedInputFlagException): with pytest.raises(UnprocessedInputFlagException):
InputCommand.parse('ssh --host 192.168.0.3 9977') InputCommand.parse('ssh --host 192.168.0.3 9977')
def test_parse_empty_raw_command(): def test_parse_raises_error_for_empty_command() -> None:
with pytest.raises(EmptyInputCommandException): with pytest.raises(EmptyInputCommandException):
InputCommand.parse('') InputCommand.parse('')
def test_validate_invalid_input_flag1(): # ============================================================================
command = Command('some', flags=Flag('test')) # Tests for flag validation - valid flags
assert command.validate_input_flag(InputFlag('test', input_value='', status=None)) == ValidationStatus.INVALID # ============================================================================
def test_validate_valid_input_flag2(): def test_validate_input_flag_returns_valid_for_registered_flag() -> None:
command = Command('some', flags=Flags([Flag('test'), Flag('more')])) command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
assert command.validate_input_flag(InputFlag('more', input_value='random-value', status=None)) == ValidationStatus.VALID assert command.validate_input_flag(InputFlag('more', input_value='random-value', status=None)) == ValidationStatus.VALID
def test_validate_undefined_input_flag1(): # ============================================================================
# Tests for flag validation - invalid flags
# ============================================================================
def test_validate_input_flag_returns_invalid_for_flag_with_empty_value() -> None:
command = Command('some', flags=Flag('test')) command = Command('some', flags=Flag('test'))
assert command.validate_input_flag(InputFlag('more', input_value='', status=None)) == ValidationStatus.UNDEFINED assert command.validate_input_flag(InputFlag('test', input_value='', status=None)) == ValidationStatus.INVALID
def test_validate_undefined_input_flag2(): def test_validate_input_flag_returns_invalid_when_value_provided_for_neither_flag() -> None:
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_validate_undefined_input_flag3():
command = Command('some')
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_invalid_input_flag1():
command = Command('some', flags=Flag('test', possible_values=PossibleValues.NEITHER)) command = Command('some', flags=Flag('test', possible_values=PossibleValues.NEITHER))
assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID
def test_invalid_input_flag2(): def test_validate_input_flag_returns_invalid_when_value_not_in_allowed_list() -> None:
command = Command('some', flags=Flag('test', possible_values=['some', 'case'])) command = Command('some', flags=Flag('test', possible_values=['some', 'case']))
assert command.validate_input_flag(InputFlag('test', input_value='slay', status=None)) == ValidationStatus.INVALID assert command.validate_input_flag(InputFlag('test', input_value='slay', status=None)) == ValidationStatus.INVALID
def test_invalid_input_flag3(): def test_validate_input_flag_returns_invalid_when_value_does_not_match_regex() -> None:
command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{, 2}op$'))) command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{1,2}op$')))
assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID assert command.validate_input_flag(InputFlag('test', input_value='example', status=None)) == ValidationStatus.INVALID
def test_isinstance_parse_correct_raw_command(): # ============================================================================
cmd = InputCommand.parse('ssh --host 192.168.0.3') # Tests for flag validation - undefined flags
assert isinstance(cmd, InputCommand) # ============================================================================
def test_validate_input_flag_returns_undefined_for_unregistered_flag_name() -> None:
command = Command('some', flags=Flag('test'))
assert command.validate_input_flag(InputFlag('more', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_validate_input_flag_returns_undefined_for_unregistered_flag_in_multiple_flags() -> None:
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
def test_validate_input_flag_returns_undefined_when_command_has_no_flags() -> None:
command = Command('some')
assert command.validate_input_flag(InputFlag('case', input_value='', status=None)) == ValidationStatus.UNDEFINED
+58 -19
View File
@@ -1,81 +1,120 @@
from typing import Generator from typing import Generator
import pytest
from dishka import Container, make_container
from argenta import App, DataBridge, Router from argenta import App, DataBridge, Router
from argenta.di.integration import (
FromDishka,
_auto_inject_handlers,
_get_container_from_response,
setup_dishka,
)
from argenta.di.providers import SystemProvider from argenta.di.providers import SystemProvider
from argenta.orchestrator.argparser import ArgParser, ArgSpace from argenta.orchestrator.argparser import ArgParser, ArgSpace
from argenta.response import ResponseStatus from argenta.response import ResponseStatus
from dishka import Container, make_container
import pytest
from argenta.response.entity import Response from argenta.response.entity import Response
from argenta.di.integration import FromDishka, _get_container_from_response, setup_dishka, _auto_inject_handlers
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture @pytest.fixture
def argparser() -> ArgParser: def argparser() -> ArgParser:
return ArgParser(processed_args=[]) return ArgParser(processed_args=[])
@pytest.fixture @pytest.fixture
def container(argparser: ArgParser) -> Generator[Container]: def container(argparser: ArgParser) -> Generator[Container, None, None]:
container = make_container(SystemProvider(), context={ArgParser: argparser}) container = make_container(SystemProvider(), context={ArgParser: argparser})
yield container yield container
container.close() container.close()
def test_get_container_from_response(container: Container): # ============================================================================
# Tests for container retrieval from response
# ============================================================================
def test_get_container_from_response_extracts_container_from_first_response_arg(container: Container) -> None:
Response.patch_by_container(container) Response.patch_by_container(container)
response = Response(ResponseStatus.ALL_FLAGS_VALID) response = Response(ResponseStatus.ALL_FLAGS_VALID)
assert _get_container_from_response((response,), {}) == container assert _get_container_from_response((response,), {}) == container
def test_get_container_from_response4(container: Container):
def test_get_container_from_response_extracts_container_from_second_response_arg(container: Container) -> None:
Response.patch_by_container(container) Response.patch_by_container(container)
response = Response(ResponseStatus.ALL_FLAGS_VALID) response = Response(ResponseStatus.ALL_FLAGS_VALID)
assert _get_container_from_response((object(), response,), {}) == container assert _get_container_from_response((object(), response,), {}) == container
def test_get_container_from_response2(container: Container):
def test_get_container_from_response_raises_error_when_container_not_patched() -> None:
delattr(Response, '__dishka_container__') delattr(Response, '__dishka_container__')
response = Response(ResponseStatus.ALL_FLAGS_VALID) response = Response(ResponseStatus.ALL_FLAGS_VALID)
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
_get_container_from_response((response,), {}) _get_container_from_response((response,), {})
def test_get_container_from_response3(container: Container):
def test_get_container_from_response_raises_error_when_no_response_in_args(container: Container) -> None:
Response.patch_by_container(container) Response.patch_by_container(container)
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
assert _get_container_from_response((), {}) == container _get_container_from_response((), {})
def test_setup_dishka(container: Container):
# ============================================================================
# Tests for dishka setup
# ============================================================================
def test_setup_dishka_with_auto_inject_enabled(container: Container) -> None:
app = App() app = App()
router = Router() router = Router()
@router.command('command') @router.command('command')
def handler(res: Response, data_bridge: FromDishka[DataBridge]): def handler(_res: Response, data_bridge: FromDishka[DataBridge]) -> None:
print(data_bridge) print(data_bridge)
app.include_router(router) app.include_router(router)
assert setup_dishka(app, container, auto_inject=True) is None assert setup_dishka(app, container, auto_inject=True) is None
def test_setup_dishka2(container: Container):
def test_setup_dishka_with_auto_inject_disabled(container: Container) -> None:
app = App() app = App()
assert setup_dishka(app, container, auto_inject=False) is None assert setup_dishka(app, container, auto_inject=False) is None
def test_auto_inject_handlers(container: Container):
# ============================================================================
# Tests for auto injection
# ============================================================================
def test_auto_inject_handlers_injects_dependencies_into_handlers(container: Container) -> None:
Response.patch_by_container(container) Response.patch_by_container(container)
app = App() app = App()
router = Router() router = Router()
@router.command('command') @router.command('command')
def handler(res: Response, data_bridge: FromDishka[DataBridge]): def handler(_res: Response, data_bridge: FromDishka[DataBridge]) -> None:
print(data_bridge) print(data_bridge)
app.include_router(router) app.include_router(router)
_auto_inject_handlers(app) _auto_inject_handlers(app)
_auto_inject_handlers(app) # check idempotency _auto_inject_handlers(app) # check idempotency
def test_get_from_container(container: Container):
# ============================================================================
# Tests for container dependency resolution
# ============================================================================F
def test_container_resolves_argspace_dependency(container: Container) -> None:
assert isinstance(container.get(ArgSpace), ArgSpace) assert isinstance(container.get(ArgSpace), ArgSpace)
def test_get_from_container2(container: Container):
def test_container_resolves_databridge_dependency(container: Container) -> None:
assert isinstance(container.get(DataBridge), DataBridge) assert isinstance(container.get(DataBridge), DataBridge)
+35 -10
View File
@@ -1,36 +1,61 @@
from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine from argenta.app.dividing_line import DynamicDividingLine, StaticDividingLine
def test_get_static_dividing_line_full_line(): # ============================================================================
# Tests for StaticDividingLine - full line generation
# ============================================================================
def test_static_dividing_line_generates_default_length_with_override() -> None:
line = StaticDividingLine('-') line = StaticDividingLine('-')
assert line.get_full_static_line(is_override=True).count('-') == 25 assert line.get_full_static_line(is_override=True).count('-') == 25
def test_get_static_dividing_line2_full_line():
def test_static_dividing_line_generates_custom_length_with_formatting() -> None:
line = StaticDividingLine('-', length=5) line = StaticDividingLine('-', length=5)
assert line.get_full_static_line(is_override=False) == '\n[dim]-----[/dim]\n' assert line.get_full_static_line(is_override=False) == '\n[dim]-----[/dim]\n'
def test_get_dividing_line_unit_part():
# ============================================================================
# Tests for StaticDividingLine - unit part extraction
# ============================================================================
def test_static_dividing_line_returns_space_for_empty_unit() -> None:
line = StaticDividingLine('') line = StaticDividingLine('')
assert line.get_unit_part() == ' ' assert line.get_unit_part() == ' '
def test_get_dividing_line2_unit_part():
def test_static_dividing_line_returns_first_character_as_unit() -> None:
line = StaticDividingLine('+-0987654321!@#$%^&*()_') line = StaticDividingLine('+-0987654321!@#$%^&*()_')
assert line.get_unit_part() == '+' assert line.get_unit_part() == '+'
def test_get_dynamic_dividing_line_full_line():
# ============================================================================
# Tests for DynamicDividingLine - full line generation
# ============================================================================
def test_dynamic_dividing_line_generates_line_with_specified_length_and_override() -> None:
line = DynamicDividingLine() line = DynamicDividingLine()
assert line.get_full_dynamic_line(length=20, is_override=True).count('-') == 20 assert line.get_full_dynamic_line(length=20, is_override=True).count('-') == 20
def test_get_dynamic_dividing_line2_full_line():
def test_dynamic_dividing_line_generates_line_with_specified_length_and_formatting() -> None:
line = DynamicDividingLine() line = DynamicDividingLine()
assert line.get_full_dynamic_line(length=5, is_override=False) == '\n[dim]-----[/dim]\n' assert line.get_full_dynamic_line(length=5, is_override=False) == '\n[dim]-----[/dim]\n'
def test_get_dynamic_dividing_line_unit_part():
# ============================================================================
# Tests for DynamicDividingLine - unit part extraction
# ============================================================================
def test_dynamic_dividing_line_returns_space_for_empty_unit() -> None:
line = DynamicDividingLine('') line = DynamicDividingLine('')
assert line.get_unit_part() == ' ' assert line.get_unit_part() == ' '
def test_get_dynamic_dividing_line2_unit_part():
def test_dynamic_dividing_line_returns_first_character_as_unit() -> None:
line = DynamicDividingLine('45n352834528&^%@&*T$G') line = DynamicDividingLine('45n352834528&^%@&*T$G')
assert line.get_unit_part() == '4' assert line.get_unit_part() == '4'
+195 -107
View File
@@ -1,100 +1,226 @@
import re import re
import pytest
from argenta.command.flag import Flag, InputFlag, PossibleValues from argenta.command.flag import Flag, InputFlag, PossibleValues
from argenta.command.flag.flags import Flags, InputFlags from argenta.command.flag.flags import Flags, InputFlags
import pytest
def test_get_string_entity(): # ============================================================================
# Tests for Flag - basic properties
# ============================================================================
def test_flag_string_entity_with_default_prefix() -> None:
assert Flag(name='test').string_entity == '--test' assert Flag(name='test').string_entity == '--test'
def test_get_string_entity2(): def test_flag_string_entity_with_custom_prefix() -> None:
assert Flag(name='test', prefix='---').string_entity == '---test' assert Flag(name='test', prefix='---').string_entity == '---test'
def test_get_flag_name(): def test_flag_name_property() -> None:
assert Flag(name='test').name == 'test' assert Flag(name='test').name == 'test'
def test_get_flag_prefix(): def test_flag_prefix_property_default() -> None:
assert Flag(name='test').prefix == '--' assert Flag(name='test').prefix == '--'
def test_get_flag_prefix2(): def test_flag_prefix_property_custom() -> None:
assert Flag(name='test', prefix='--').prefix == '--' assert Flag(name='test', prefix='--').prefix == '--'
def test_get_flag_value_without_set(): # ============================================================================
assert InputFlag(name='test', input_value='', status=None).input_value == '' # Tests for Flag - string representations
# ============================================================================
def test_get_flag_value_with_set(): def test_flag_str_representation() -> None:
flag = InputFlag(name='test', input_value='example', status=None) flag = Flag('two')
assert flag.input_value == 'example' assert str(flag) == '--two'
def test_validate_incorrect_flag_value_with_list_of_possible_flag_values(): def test_flag_repr_representation() -> None:
flag = Flag(name='test', possible_values=['1', '2', '3']) flag = Flag('two')
assert flag.validate_input_flag_value('bad value') is False assert repr(flag) == 'Flag<name=two, prefix=-->'
def test_validate_correct_flag_value_with_list_of_possible_flag_values(): def test_flag_equality_with_non_flag_raises_error() -> None:
flag = Flag('two')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for Flag - value validation with list of possible values
# ============================================================================
def test_flag_validates_value_in_allowed_list() -> None:
flag = Flag(name='test', possible_values=['1', '2', '3']) flag = Flag(name='test', possible_values=['1', '2', '3'])
assert flag.validate_input_flag_value('1') is True assert flag.validate_input_flag_value('1') is True
def test_validate_incorrect_flag_value_with_pattern_of_possible_flag_values(): def test_flag_rejects_value_not_in_allowed_list() -> None:
flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+')) flag = Flag(name='test', possible_values=['1', '2', '3'])
assert flag.validate_input_flag_value('152.123.9.8') is False assert flag.validate_input_flag_value('bad value') is False
def test_validate_correct_flag_value_with_pattern_of_possible_flag_values(): # ============================================================================
# Tests for Flag - value validation with regex pattern
# ============================================================================
def test_flag_validates_value_matching_regex_pattern() -> None:
flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+')) flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+'))
assert flag.validate_input_flag_value('192.168.9.8') is True assert flag.validate_input_flag_value('192.168.9.8') is True
def test_validate_correct_empty_flag_value_without_possible_flag_values(): def test_flag_rejects_value_not_matching_regex_pattern() -> None:
flag = Flag(name='test', possible_values=re.compile(r'192.168.\d+.\d+'))
assert flag.validate_input_flag_value('152.123.9.8') is False
# ============================================================================
# Tests for Flag - value validation with NEITHER and ALL
# ============================================================================
def test_flag_validates_empty_value_when_neither_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.NEITHER) flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
assert flag.validate_input_flag_value('') is True assert flag.validate_input_flag_value('') is True
def test_validate_correct_empty_flag_value_with_possible_flag_values(): def test_flag_rejects_non_empty_value_when_neither_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
assert flag.validate_input_flag_value('') is True
def test_validate_incorrect_random_flag_value_without_possible_flag_values():
flag = Flag(name='test', possible_values=PossibleValues.NEITHER) flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
assert flag.validate_input_flag_value('random value') is False assert flag.validate_input_flag_value('random value') is False
def test_validate_correct_random_flag_value_with_possible_flag_values(): def test_flag_validates_any_value_when_all_allowed() -> None:
flag = Flag(name='test', possible_values=PossibleValues.ALL) flag = Flag(name='test', possible_values=PossibleValues.ALL)
assert flag.validate_input_flag_value('random value') is True assert flag.validate_input_flag_value('random value') is True
def test_get_input_flag1(): # ============================================================================
# Tests for InputFlag - basic properties
# ============================================================================
def test_input_flag_stores_empty_value() -> None:
assert InputFlag(name='test', input_value='', status=None).input_value == ''
def test_input_flag_stores_provided_value() -> None:
flag = InputFlag(name='test', input_value='example', status=None)
assert flag.input_value == 'example'
# ============================================================================
# Tests for InputFlag - string representations
# ============================================================================
def test_input_flag_str_representation() -> None:
flag = InputFlag('two', input_value='value')
assert str(flag) == '--two value'
def test_input_flag_repr_representation() -> None:
flag = InputFlag('two', input_value='some_value')
assert repr(flag) == 'InputFlag<name=two, prefix=--, value=some_value, status=None>'
def test_input_flag_equality_with_non_flag_raises_error() -> None:
flag = InputFlag('two', input_value='')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for InputFlags collection - retrieval
# ============================================================================
def test_input_flags_get_by_name_finds_single_flag() -> None:
flag = InputFlag(name='test', input_value='', status=None) flag = InputFlag(name='test', input_value='', status=None)
input_flags = InputFlags([flag]) input_flags = InputFlags([flag])
assert input_flags.get_flag_by_name('test') == flag assert input_flags.get_flag_by_name('test') == flag
def test_get_input_flag2(): def test_input_flags_get_by_name_finds_flag_in_multiple() -> None:
flag = InputFlag(name='test', input_value='', status=None) flag = InputFlag(name='test', input_value='', status=None)
flag2 = InputFlag(name='some', input_value='', status=None) flag2 = InputFlag(name='some', input_value='', status=None)
input_flags = InputFlags([flag, flag2]) input_flags = InputFlags([flag, flag2])
assert input_flags.get_flag_by_name('some') == flag2 assert input_flags.get_flag_by_name('some') == flag2
def test_get_undefined_input_flag(): def test_input_flags_get_by_name_returns_none_for_missing_flag() -> None:
flag = InputFlag(name='test', input_value='', status=None) flag = InputFlag(name='test', input_value='', status=None)
flag2 = InputFlag(name='some', input_value='', status=None) flag2 = InputFlag(name='some', input_value='', status=None)
input_flags = InputFlags([flag, flag2]) input_flags = InputFlags([flag, flag2])
assert input_flags.get_flag_by_name('case') is None assert input_flags.get_flag_by_name('case') is None
def test_get_flags(): # ============================================================================
# Tests for InputFlags collection - equality and containment
# ============================================================================
def test_input_flags_not_equal_when_different_length() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
flags2 = InputFlags([
InputFlag('some', input_value=''),
InputFlag('some2', input_value='')
])
assert flags != flags2
def test_input_flags_not_equal_to_non_input_flags() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
not_flags = object()
assert flags != not_flags
def test_input_flags_contains_existing_flag() -> None:
flag = InputFlag('some', input_value='')
flags = InputFlags([flag])
assert flag in flags
def test_input_flags_does_not_contain_missing_flag() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
flag = InputFlag('nonexists', input_value='')
assert flag not in flags
def test_input_flags_contains_raises_error_for_non_flag() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
not_flag = object
with pytest.raises(TypeError):
not_flag in flags # pyright: ignore[reportUnusedExpression]
# ============================================================================
# Tests for Flags collection - adding flags
# ============================================================================
def test_flags_add_single_flag() -> None:
flags = Flags()
flags.add_flag(Flag('test'))
assert len(flags.flags) == 1
def test_flags_add_multiple_flags() -> None:
flags = Flags()
flags.add_flags([Flag('test'), Flag('test2')])
assert len(flags.flags) == 2
def test_flags_stores_added_flags() -> None:
flags = Flags() flags = Flags()
list_of_flags = [ list_of_flags = [
Flag('test1'), Flag('test1'),
@@ -105,122 +231,84 @@ def test_get_flags():
assert flags.flags == list_of_flags assert flags.flags == list_of_flags
def test_add_flag(): # ============================================================================
flags = Flags() # Tests for Flags collection - retrieval
flags.add_flag(Flag('test')) # ============================================================================
assert len(flags.flags) == 1
def test_add_flags(): def test_flags_get_by_name_finds_flag() -> None:
flags = Flags() flags = Flags([Flag('some')])
flags.add_flags([Flag('test'), Flag('test2')]) assert flags.get_flag_by_name('some') == Flag('some')
assert len(flags.flags) == 2
def test_eq_flags():
# ============================================================================
# Tests for Flags collection - equality and containment
# ============================================================================
def test_flags_equal_when_same_flags() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
flags2 = Flags([Flag('some')]) flags2 = Flags([Flag('some')])
assert flags == flags2 assert flags == flags2
def test_contains_flags():
flags = Flags([Flag('some')])
flag = Flag('some')
assert flag in flags
def test_eq_flags2(): def test_flags_not_equal_when_different_flags() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
flags2 = Flags([Flag('other')]) flags2 = Flags([Flag('other')])
assert flags != flags2 assert flags != flags2
def test_eq_flags3():
def test_flags_not_equal_when_different_length() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
flags2 = Flags([Flag('some'), Flag('other')]) flags2 = Flags([Flag('some'), Flag('other')])
assert flags != flags2 assert flags != flags2
def test_eq_flags4():
def test_flags_not_equal_to_non_flags() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
not_flags = object() not_flags = object()
assert flags != not_flags assert flags != not_flags
def test_contains_flags2():
def test_flags_contains_existing_flag() -> None:
flags = Flags([Flag('some')])
flag = Flag('some')
assert flag in flags
def test_flags_does_not_contain_missing_flag() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
flag = Flag('nonexists') flag = Flag('nonexists')
assert flag not in flags assert flag not in flags
def test_contains_flags3():
def test_flags_contains_raises_error_for_non_flag() -> None:
flags = Flags([Flag('some')]) flags = Flags([Flag('some')])
not_flag = object not_flag = object
with pytest.raises(TypeError): with pytest.raises(TypeError):
not_flag in flags # pyright: ignore[reportUnusedExpression] not_flag in flags # pyright: ignore[reportUnusedExpression]
def test_get_flag_by_name():
flags = Flags([Flag('some')])
assert flags.get_flag_by_name('some') == Flag('some')
def test_eq_input_flags3(): # ============================================================================
flags = InputFlags([InputFlag('some', input_value='')]) # Tests for Flags collection - special methods
flags2 = InputFlags([ # ============================================================================
InputFlag('some', input_value=''),
InputFlag('some2', input_value='')
])
assert flags != flags2
def test_eq_input_flags4():
flags = InputFlags([InputFlag('some', input_value='')])
not_flags = object()
assert flags != not_flags
def test_contains_input_flags2(): def test_flags_len_returns_count() -> None:
flags = InputFlags([InputFlag('some', input_value='')])
flag = InputFlag('nonexists', input_value='')
assert flag not in flags
def test_contains_input_flags3():
flags = InputFlags([InputFlag('some', input_value='')])
not_flag = object
with pytest.raises(TypeError):
not_flag in flags # pyright: ignore[reportUnusedExpression]
def test_len_flags():
flags = Flags([Flag('one'), Flag('two')]) flags = Flags([Flag('one'), Flag('two')])
assert len(flags) == 2 assert len(flags) == 2
def test_bool_flags():
def test_flags_bool_returns_true_when_not_empty() -> None:
flags = Flags([Flag('one'), Flag('two')]) flags = Flags([Flag('one'), Flag('two')])
assert bool(flags) assert bool(flags)
def test_bool_flags2():
def test_flags_bool_returns_false_when_empty() -> None:
flags = Flags([]) flags = Flags([])
assert not bool(flags) assert not bool(flags)
def test_getitem_flags():
def test_flags_getitem_returns_flag_at_index() -> None:
flags = Flags([Flag('one'), Flag('two')]) flags = Flags([Flag('one'), Flag('two')])
assert flags[1] == Flag('two') assert flags[1] == Flag('two')
def test_str_flag():
flag = Flag('two')
assert str(flag) == '--two'
def test_repr_flag():
flag = Flag('two')
assert repr(flag) == 'Flag<name=two, prefix=-->'
def test_eq_flag():
flag = Flag('two')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
def test_str_input_flag():
flag = InputFlag('two', input_value='value')
assert str(flag) == '--two value'
def test_repr_input_flag():
flag = InputFlag('two', input_value='some_value')
assert repr(flag) == 'InputFlag<name=two, prefix=--, value=some_value, status=None>'
def test_eq_input_flag():
flag = InputFlag('two', input_value='')
not_flag = object()
with pytest.raises(NotImplementedError):
flag == not_flag # pyright: ignore[reportUnusedExpression]
+56 -20
View File
@@ -2,27 +2,37 @@ from datetime import date, datetime
import pytest import pytest
from argenta.data_bridge import DataBridge
from argenta.command.flag.models import InputFlag
from argenta.command.flag.flags.models import InputFlags from argenta.command.flag.flags.models import InputFlags
from argenta.command.flag.models import InputFlag
from argenta.data_bridge import DataBridge
from argenta.response.entity import EMPTY_INPUT_FLAGS, Response from argenta.response.entity import EMPTY_INPUT_FLAGS, Response
from argenta.response.status import ResponseStatus from argenta.response.status import ResponseStatus
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture @pytest.fixture
def data_bridge(): def data_bridge() -> DataBridge:
"""Create a new DataBridge instance for each test""" """Create a new DataBridge instance for each test"""
return DataBridge() return DataBridge()
def test_update_data_basic(data_bridge: DataBridge): # ============================================================================
# Tests for DataBridge - basic data operations
# ============================================================================
def test_databridge_update_stores_basic_data(data_bridge: DataBridge) -> None:
"""Test basic data update functionality""" """Test basic data update functionality"""
test_data = {"key1": "value1", "key2": "value2"} test_data = {"key1": "value1", "key2": "value2"}
data_bridge.update(test_data) data_bridge.update(test_data)
assert data_bridge.get_all() == test_data assert data_bridge.get_all() == test_data
def test_update_data_with_datetime(data_bridge: DataBridge): def test_databridge_update_stores_datetime_objects(data_bridge: DataBridge) -> None:
"""Test updating data with datetime objects""" """Test updating data with datetime objects"""
test_datetime = datetime(2024, 1, 15, 10, 30, 45) test_datetime = datetime(2024, 1, 15, 10, 30, 45)
test_data = {"created_at": test_datetime, "name": "test"} test_data = {"created_at": test_datetime, "name": "test"}
@@ -33,7 +43,7 @@ def test_update_data_with_datetime(data_bridge: DataBridge):
assert result["name"] == "test" assert result["name"] == "test"
def test_update_data_multiple_calls(data_bridge: DataBridge): def test_databridge_multiple_updates_merge_data(data_bridge: DataBridge) -> None:
"""Test multiple update calls merge data""" """Test multiple update calls merge data"""
first_data = {"key1": "value1"} first_data = {"key1": "value1"}
second_data = {"key2": "value2"} second_data = {"key2": "value2"}
@@ -42,12 +52,37 @@ def test_update_data_multiple_calls(data_bridge: DataBridge):
assert len(data_bridge.get_all()) == 2 assert len(data_bridge.get_all()) == 2
def test_get_data_empty(data_bridge: DataBridge): # ============================================================================
# Tests for DataBridge - data retrieval
# ============================================================================
def test_databridge_get_all_returns_empty_dict_initially(data_bridge: DataBridge) -> None:
"""Test get_all returns empty dict when no data""" """Test get_all returns empty dict when no data"""
assert data_bridge.get_all() == {} assert data_bridge.get_all() == {}
def test_clear_data(data_bridge: DataBridge): def test_databridge_get_by_key_retrieves_correct_values(data_bridge: DataBridge) -> None:
"""Test get_by_key retrieves correct value"""
test_data = {"key1": "value1", "key2": date(2024, 1, 1)}
data_bridge.update(test_data)
assert data_bridge.get_by_key("key1") == "value1"
assert data_bridge.get_by_key("key2") == date(2024, 1, 1)
def test_databridge_get_by_key_returns_none_for_missing_key(data_bridge: DataBridge) -> None:
"""Test get_by_key returns None for nonexistent key"""
test_data = {"key1": "value1"}
data_bridge.update(test_data)
assert data_bridge.get_by_key("nonexistent") is None
# ============================================================================
# Tests for DataBridge - data deletion
# ============================================================================
def test_databridge_clear_all_removes_all_data(data_bridge: DataBridge) -> None:
"""Test clear_all removes all data""" """Test clear_all removes all data"""
data_bridge.update({"key": "value"}) data_bridge.update({"key": "value"})
assert data_bridge.get_all() != {} assert data_bridge.get_all() != {}
@@ -55,7 +90,7 @@ def test_clear_data(data_bridge: DataBridge):
assert data_bridge.get_all() == {} assert data_bridge.get_all() == {}
def test_delete_from_data(data_bridge: DataBridge): def test_databridge_delete_by_key_removes_specific_key(data_bridge: DataBridge) -> None:
"""Test delete_by_key removes specific key""" """Test delete_by_key removes specific key"""
test_data = {"key1": "value1", "key2": "value2"} test_data = {"key1": "value1", "key2": "value2"}
data_bridge.update(test_data) data_bridge.update(test_data)
@@ -65,29 +100,25 @@ def test_delete_from_data(data_bridge: DataBridge):
assert "key2" in result assert "key2" in result
def test_delete_from_data_nonexistent_key(data_bridge: DataBridge): def test_databridge_delete_by_key_raises_error_for_missing_key(data_bridge: DataBridge) -> None:
"""Test delete_by_key with nonexistent key raises KeyError""" """Test delete_by_key with nonexistent key raises KeyError"""
with pytest.raises(KeyError): with pytest.raises(KeyError):
data_bridge.delete_by_key("nonexistent_key") data_bridge.delete_by_key("nonexistent_key")
def test_get_by_key(data_bridge: DataBridge): # ============================================================================
"""Test get_by_key retrieves correct value""" # Tests for Response - initialization
test_data = {"key1": "value1", "key2": date(2024, 1, 1)} # ============================================================================
data_bridge.update(test_data)
assert data_bridge.get_by_key("key1") == "value1"
assert data_bridge.get_by_key("key2") == date(2024, 1, 1)
assert data_bridge.get_by_key("nonexistent") is None
def test_response_initialization_basic(): def test_response_initializes_with_status_and_empty_flags() -> None:
"""Test basic Response initialization""" """Test basic Response initialization"""
response = Response(ResponseStatus.ALL_FLAGS_VALID) response = Response(ResponseStatus.ALL_FLAGS_VALID)
assert response.status == ResponseStatus.ALL_FLAGS_VALID assert response.status == ResponseStatus.ALL_FLAGS_VALID
assert response.input_flags == EMPTY_INPUT_FLAGS assert response.input_flags == EMPTY_INPUT_FLAGS
def test_response_initialization_with_flags(): def test_response_initializes_with_status_and_input_flags() -> None:
"""Test Response initialization with input flags""" """Test Response initialization with input flags"""
input_flags = InputFlags([InputFlag('test', input_value='value', status=None)]) input_flags = InputFlags([InputFlag('test', input_value='value', status=None)])
response = Response(ResponseStatus.INVALID_VALUE_FLAGS, input_flags) response = Response(ResponseStatus.INVALID_VALUE_FLAGS, input_flags)
@@ -95,7 +126,12 @@ def test_response_initialization_with_flags():
assert response.input_flags == input_flags assert response.input_flags == input_flags
def test_response_status_types(): # ============================================================================
# Tests for Response - status types
# ============================================================================
def test_response_accepts_all_status_types() -> None:
"""Test Response with different status types""" """Test Response with different status types"""
statuses = [ statuses = [
ResponseStatus.ALL_FLAGS_VALID, ResponseStatus.ALL_FLAGS_VALID,
+168 -108
View File
@@ -1,4 +1,5 @@
import re import re
import pytest import pytest
from pytest import CaptureFixture from pytest import CaptureFixture
@@ -9,131 +10,48 @@ from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.response.entity import Response from argenta.response.entity import Response
from argenta.router import Router from argenta.router import Router
from argenta.router.entity import _structuring_input_flags, _validate_func_args # pyright: ignore[reportPrivateUsage] from argenta.router.entity import _structuring_input_flags, _validate_func_args # pyright: ignore[reportPrivateUsage]
from argenta.router.exceptions import (RepeatedFlagNameException, from argenta.router.exceptions import (
RequiredArgumentNotPassedException, RepeatedFlagNameException,
TriggerContainSpacesException) RequiredArgumentNotPassedException,
TriggerContainSpacesException,
)
def test_register_command_with_spaces_in_trigger(): # ============================================================================
# Tests for command validation
# ============================================================================
def test_validate_command_raises_error_for_trigger_with_spaces() -> None:
router = Router() router = Router()
with pytest.raises(TriggerContainSpacesException): with pytest.raises(TriggerContainSpacesException):
router._validate_command(Command(trigger='command with spaces')) router._validate_command(Command(trigger='command with spaces'))
def test_register_command_with_repeated_flags():
def test_validate_command_raises_error_for_repeated_flag_names() -> None:
router = Router() router = Router()
with pytest.raises(RepeatedFlagNameException): with pytest.raises(RepeatedFlagNameException):
router._validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')]))) router._validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')])))
def test_structuring_input_flags1():
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags2(): # ============================================================================
cmd = Command('cmd') # Tests for function argument validation
input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)]) # ============================================================================
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags3():
cmd = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags4(): def test_validate_func_args_raises_error_for_missing_response_parameter() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER)) def handler() -> None:
input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)])
def test_structuring_input_flags5():
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)])
def test_structuring_input_flags6():
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)])
def test_structuring_input_flags7():
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags8():
command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3']))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags9():
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)])
def test_structuring_input_flags10():
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.VALID)])
def test_validate_incorrect_func_args1():
def handler():
pass pass
with pytest.raises(RequiredArgumentNotPassedException): with pytest.raises(RequiredArgumentNotPassedException):
_validate_func_args(handler) # pyright: ignore[reportArgumentType] _validate_func_args(handler) # pyright: ignore[reportArgumentType]
def test_get_router_aliases():
router = Router() def test_validate_func_args_prints_warning_for_wrong_type_hint(capsys: CaptureFixture[str]) -> None:
@router.command(Command('some', aliases={'test', 'case'})) class NotResponse:
def handler(response: Response) -> None:
pass pass
assert router.aliases == {'test', 'case'}
def test_get_router_aliases2(): def func(_response: NotResponse) -> None:
router = Router()
@router.command(Command('some', aliases={'test', 'case'}))
def handler(response: Response):
pass pass
@router.command(Command('ext', aliases={'more', 'foo'}))
def handler2(response: Response):
pass
assert router.aliases == {'test', 'case', 'more', 'foo'}
def test_get_router_aliases3():
router = Router()
@router.command(Command('some'))
def handler(response: Response):
pass
assert router.aliases == set()
def test_find_appropiate_handler(capsys: pytest.CaptureFixture[str]):
router = Router()
@router.command(Command('hello', aliases={'hi'}))
def handler(res: Response):
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out
def test_find_appropiate_handler2(capsys: CaptureFixture[str]):
router = Router()
@router.command(Command('hello', flags=Flag('flag'), aliases={'hi'}))
def handler(res: Response):
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out
def test_wrong_typehint(capsys: pytest.CaptureFixture[str]):
class NotResponse: pass
def func(response: NotResponse): pass
_validate_func_args(func) _validate_func_args(func)
@@ -141,8 +59,150 @@ def test_wrong_typehint(capsys: pytest.CaptureFixture[str]):
assert "WARNING" in output.out assert "WARNING" in output.out
def test_missing_typehint(capsys: pytest.CaptureFixture[str]):
def func(response): pass # pyright: ignore[reportMissingParameterType, reportUnknownParameterType] def test_validate_func_args_accepts_missing_type_hint(capsys: CaptureFixture[str]) -> None:
def func(response) -> None: # pyright: ignore[reportMissingParameterType, reportUnknownParameterType]
pass
_validate_func_args(func) # pyright: ignore[reportUnknownArgumentType] _validate_func_args(func) # pyright: ignore[reportUnknownArgumentType]
output = capsys.readouterr() output = capsys.readouterr()
assert output.out == '' assert output.out == ''
# ============================================================================
# Tests for input flag structuring - undefined flags
# ============================================================================
def test_structuring_input_flags_marks_unregistered_flag_as_undefined() -> None:
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags_marks_unregistered_flag_with_value_as_undefined() -> None:
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)])
def test_structuring_input_flags_marks_flag_undefined_when_different_flag_registered() -> None:
cmd = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)])
assert _structuring_input_flags(cmd, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)])
# ============================================================================
# Tests for input flag structuring - invalid flags
# ============================================================================
def test_structuring_input_flags_marks_flag_invalid_when_value_provided_for_neither() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)])
def test_structuring_input_flags_marks_flag_invalid_when_value_not_matching_regex() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)])
def test_structuring_input_flags_marks_flag_invalid_when_value_not_in_list() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)])
# ============================================================================
# Tests for input flag structuring - valid flags
# ============================================================================
def test_structuring_input_flags_marks_registered_flag_as_valid() -> None:
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_value_in_list() -> None:
command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3']))
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_value_matches_regex() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$')))
input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)])
def test_structuring_input_flags_marks_flag_valid_when_empty_value_for_neither() -> None:
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='', status=None)])
assert _structuring_input_flags(command, input_flags).input_flags == InputFlags([InputFlag('ssh', input_value='', status=ValidationStatus.VALID)])
# ============================================================================
# Tests for router aliases
# ============================================================================
def test_router_aliases_returns_command_aliases() -> None:
router = Router()
@router.command(Command('some', aliases={'test', 'case'}))
def handler(_response: Response) -> None:
pass
assert router.aliases == {'test', 'case'}
def test_router_aliases_returns_combined_aliases_from_multiple_commands() -> None:
router = Router()
@router.command(Command('some', aliases={'test', 'case'}))
def handler(_response: Response) -> None:
pass
@router.command(Command('ext', aliases={'more', 'foo'}))
def handler2(_response: Response) -> None:
pass
assert router.aliases == {'test', 'case', 'more', 'foo'}
def test_router_aliases_returns_empty_set_when_no_aliases() -> None:
router = Router()
@router.command(Command('some'))
def handler(_response: Response) -> None:
pass
assert router.aliases == set()
# ============================================================================
# Tests for handler finding and execution
# ============================================================================
def test_finds_appropriate_handler_executes_handler_by_alias(capsys: CaptureFixture[str]) -> None:
router = Router()
@router.command(Command('hello', aliases={'hi'}))
def handler(_res: Response) -> None:
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out
def test_finds_appropriate_handler_executes_handler_with_flags_by_alias(capsys: CaptureFixture[str]) -> None:
router = Router()
@router.command(Command('hello', flags=Flag('flag'), aliases={'hi'}))
def handler(_res: Response) -> None:
print("Hello World!")
router.finds_appropriate_handler(InputCommand('hi'))
output = capsys.readouterr()
assert "Hello World!" in output.out