mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 10:05:28 +03:00
benchs
This commit is contained in:
@@ -618,11 +618,17 @@ def test_app_handlers_work_with_multiple_routers() -> None:
|
||||
|
||||
app.set_unknown_command_handler(custom_handler)
|
||||
|
||||
# Both commands should be known
|
||||
assert not app._is_unknown_command(InputCommand('cmd1'))
|
||||
assert not app._is_unknown_command(InputCommand('cmd2'))
|
||||
|
||||
# Unknown command should trigger handler
|
||||
assert app._is_unknown_command(InputCommand('unknown'))
|
||||
app._unknown_command_handler(InputCommand('unknown'))
|
||||
assert call_tracker['called']
|
||||
|
||||
|
||||
def test_process_exist_and_valid_command_raises_runtime_error_when_router_not_found() -> None:
|
||||
app = App()
|
||||
app._pre_cycle_setup()
|
||||
|
||||
with pytest.raises(RuntimeError, match="Router for 'nonexistent' not found. Panic!"):
|
||||
app._process_exist_and_valid_command(InputCommand('nonexistent'))
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Any, Callable
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from prompt_toolkit import HTML
|
||||
from prompt_toolkit.completion import CompleteEvent
|
||||
from prompt_toolkit.document import Document
|
||||
from prompt_toolkit.history import InMemoryHistory
|
||||
|
||||
@@ -75,7 +83,7 @@ def test_history_completer_returns_matching_commands() -> None:
|
||||
completer = HistoryCompleter(history, {"status"})
|
||||
doc = Document("sta")
|
||||
|
||||
completions = list(completer.get_completions(doc, None))
|
||||
completions = list(completer.get_completions(doc, CompleteEvent()))
|
||||
completion_texts = [c.text for c in completions]
|
||||
|
||||
assert "start server" in completion_texts
|
||||
@@ -91,7 +99,7 @@ def test_history_completer_returns_all_when_empty_input() -> None:
|
||||
completer = HistoryCompleter(history, {"status"})
|
||||
doc = Document("")
|
||||
|
||||
completions = list(completer.get_completions(doc, None))
|
||||
completions = list(completer.get_completions(doc, CompleteEvent()))
|
||||
completion_texts = [c.text for c in completions]
|
||||
|
||||
assert len(completion_texts) == 3
|
||||
@@ -107,7 +115,7 @@ def test_history_completer_returns_empty_when_no_matches() -> None:
|
||||
completer = HistoryCompleter(history, {"stop"})
|
||||
doc = Document("xyz")
|
||||
|
||||
completions = list(completer.get_completions(doc, None))
|
||||
completions = list(completer.get_completions(doc, CompleteEvent()))
|
||||
assert len(completions) == 0
|
||||
|
||||
|
||||
@@ -119,7 +127,7 @@ def test_history_completer_deduplicates_commands() -> None:
|
||||
completer = HistoryCompleter(history, {"start"})
|
||||
doc = Document("sta")
|
||||
|
||||
completions = list(completer.get_completions(doc, None))
|
||||
completions = list(completer.get_completions(doc, CompleteEvent()))
|
||||
assert len(completions) == 1
|
||||
|
||||
|
||||
@@ -132,7 +140,7 @@ def test_history_completer_sorts_results() -> None:
|
||||
completer = HistoryCompleter(history, set())
|
||||
doc = Document("st")
|
||||
|
||||
completions = list(completer.get_completions(doc, None))
|
||||
completions = list(completer.get_completions(doc, CompleteEvent()))
|
||||
completion_texts = [c.text for c in completions]
|
||||
|
||||
assert completion_texts == ["start", "status", "stop"]
|
||||
@@ -160,3 +168,311 @@ def test_find_common_prefix_with_empty_list() -> None:
|
||||
matches: list[str] = []
|
||||
prefix = HistoryCompleter._find_common_prefix(matches)
|
||||
assert prefix == ""
|
||||
|
||||
|
||||
def test_command_lexer_handles_out_of_range_lineno() -> None:
|
||||
lexer = CommandLexer({"start", "stop"})
|
||||
doc = Document("start")
|
||||
get_line_tokens = lexer.lex_document(doc)
|
||||
tokens = get_line_tokens(1)
|
||||
assert tokens == []
|
||||
|
||||
|
||||
def test_history_completer_returns_early_when_no_matches() -> None:
|
||||
history = InMemoryHistory()
|
||||
completer = HistoryCompleter(history, {"start", "stop"})
|
||||
doc = Document("xyz")
|
||||
|
||||
result = completer.get_completions(doc, CompleteEvent())
|
||||
completions = list(result)
|
||||
assert completions == []
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_with_commands() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession') as mock_session:
|
||||
completer.initial_setup({"start", "stop", "status"})
|
||||
|
||||
assert completer._session is not None
|
||||
assert completer._fallback_mode is False
|
||||
mock_session.assert_called_once()
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_with_history_file() -> None:
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
history_file = f.name
|
||||
|
||||
try:
|
||||
completer = AutoCompleter(history_filename=history_file)
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.ThreadedHistory') as mock_threaded_history:
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._session is not None
|
||||
assert completer._fallback_mode is False
|
||||
mock_threaded_history.assert_called_once()
|
||||
finally:
|
||||
if os.path.exists(history_file):
|
||||
os.unlink(history_file)
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_without_history_file() -> None:
|
||||
completer = AutoCompleter(history_filename=None)
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.InMemoryHistory') as mock_in_memory:
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._session is not None
|
||||
assert completer._fallback_mode is False
|
||||
mock_in_memory.assert_called_once()
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_with_custom_autocomplete_button() -> None:
|
||||
completer = AutoCompleter(autocomplete_button="c-space")
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'):
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._session is not None
|
||||
assert completer.autocomplete_button == "c-space"
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_without_auto_suggestions() -> None:
|
||||
completer = AutoCompleter(auto_suggestions=False)
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession') as mock_session:
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._session is not None
|
||||
call_kwargs = mock_session.call_args[1]
|
||||
assert call_kwargs['auto_suggest'] is None
|
||||
|
||||
|
||||
def test_autocompleter_initial_setup_without_command_highlighting() -> None:
|
||||
completer = AutoCompleter(command_highlighting=False)
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession') as mock_session:
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._session is not None
|
||||
call_kwargs = mock_session.call_args[1]
|
||||
assert call_kwargs['style'] is None
|
||||
assert call_kwargs['lexer'] is None
|
||||
|
||||
|
||||
def test_autocompleter_key_binding_handler_with_complete_state() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
captured_handler: Callable[[Any], None] | None = None
|
||||
|
||||
def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]:
|
||||
def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]:
|
||||
nonlocal captured_handler
|
||||
captured_handler = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class:
|
||||
|
||||
mock_kb = MagicMock()
|
||||
mock_kb.add = capture_kb_add
|
||||
mock_kb_class.return_value = mock_kb
|
||||
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert captured_handler is not None
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_buff = MagicMock()
|
||||
mock_buff.complete_state = True
|
||||
mock_event.app.current_buffer = mock_buff
|
||||
|
||||
captured_handler(mock_event)
|
||||
|
||||
mock_buff.complete_next.assert_called_once()
|
||||
|
||||
|
||||
def test_autocompleter_key_binding_handler_no_completions() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
captured_handler: Callable[[Any], None] | None = None
|
||||
|
||||
def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]:
|
||||
def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]:
|
||||
nonlocal captured_handler
|
||||
captured_handler = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class:
|
||||
|
||||
mock_kb = MagicMock()
|
||||
mock_kb.add = capture_kb_add
|
||||
mock_kb_class.return_value = mock_kb
|
||||
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_buff = MagicMock()
|
||||
mock_buff.complete_state = False
|
||||
mock_completer = MagicMock()
|
||||
mock_completer.get_completions.return_value = iter([])
|
||||
mock_buff.completer = mock_completer
|
||||
mock_event.app.current_buffer = mock_buff
|
||||
|
||||
assert captured_handler is not None
|
||||
captured_handler(mock_event)
|
||||
|
||||
mock_buff.start_completion.assert_not_called()
|
||||
mock_buff.apply_completion.assert_not_called()
|
||||
|
||||
|
||||
def test_autocompleter_key_binding_handler_single_completion() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
captured_handler: Callable[[Any], None] | None = None
|
||||
|
||||
def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]:
|
||||
def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]:
|
||||
nonlocal captured_handler
|
||||
captured_handler = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class:
|
||||
|
||||
mock_kb = MagicMock()
|
||||
mock_kb.add = capture_kb_add
|
||||
mock_kb_class.return_value = mock_kb
|
||||
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_buff = MagicMock()
|
||||
mock_buff.complete_state = False
|
||||
mock_completion = MagicMock()
|
||||
mock_completer = MagicMock()
|
||||
mock_completer.get_completions.return_value = iter([mock_completion])
|
||||
mock_buff.completer = mock_completer
|
||||
mock_event.app.current_buffer = mock_buff
|
||||
|
||||
assert captured_handler is not None
|
||||
captured_handler(mock_event)
|
||||
|
||||
mock_buff.apply_completion.assert_called_once_with(mock_completion)
|
||||
mock_buff.start_completion.assert_not_called()
|
||||
|
||||
|
||||
def test_autocompleter_key_binding_handler_multiple_completions() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
captured_handler: Callable[[Any], None] | None = None
|
||||
|
||||
def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]:
|
||||
def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]:
|
||||
nonlocal captured_handler
|
||||
captured_handler = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=True), \
|
||||
patch('argenta.app.autocompleter.entity.PromptSession'), \
|
||||
patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class:
|
||||
|
||||
mock_kb = MagicMock()
|
||||
mock_kb.add = capture_kb_add
|
||||
mock_kb_class.return_value = mock_kb
|
||||
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_buff = MagicMock()
|
||||
mock_buff.complete_state = False
|
||||
mock_completion1 = MagicMock()
|
||||
mock_completion2 = MagicMock()
|
||||
mock_completer = MagicMock()
|
||||
mock_completer.get_completions.return_value = iter([mock_completion1, mock_completion2])
|
||||
mock_buff.completer = mock_completer
|
||||
mock_event.app.current_buffer = mock_buff
|
||||
|
||||
assert captured_handler is not None
|
||||
captured_handler(mock_event)
|
||||
|
||||
mock_buff.start_completion.assert_called_once_with(select_first=False)
|
||||
mock_buff.apply_completion.assert_not_called()
|
||||
|
||||
|
||||
def test_autocompleter_prompt_in_fallback_mode_with_string() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=False):
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._fallback_mode is True
|
||||
|
||||
with patch('builtins.input', return_value='test input'):
|
||||
result = completer.prompt(">>> ")
|
||||
|
||||
assert result == 'test input'
|
||||
|
||||
|
||||
def test_autocompleter_prompt_in_fallback_mode_with_html() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
with patch.object(sys.stdin, 'isatty', return_value=False):
|
||||
completer.initial_setup({"start", "stop"})
|
||||
|
||||
assert completer._fallback_mode is True
|
||||
|
||||
with patch('builtins.input', return_value='test input'):
|
||||
result = completer.prompt(HTML("<b>>>> </b>"))
|
||||
|
||||
assert result == 'test input'
|
||||
|
||||
|
||||
def test_autocompleter_prompt_with_html_in_normal_mode() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_session.prompt.return_value = 'test result'
|
||||
completer._session = mock_session
|
||||
completer._fallback_mode = False
|
||||
|
||||
html_prompt = HTML("<b>>>> </b>")
|
||||
result = completer.prompt(html_prompt)
|
||||
|
||||
assert result == 'test result'
|
||||
mock_session.prompt.assert_called_once()
|
||||
call_args = mock_session.prompt.call_args
|
||||
assert call_args[0][0] == html_prompt
|
||||
|
||||
|
||||
def test_autocompleter_prompt_with_string_in_normal_mode() -> None:
|
||||
completer = AutoCompleter()
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_session.prompt.return_value = 'test result'
|
||||
completer._session = mock_session
|
||||
completer._fallback_mode = False
|
||||
|
||||
result = completer.prompt(">>> ")
|
||||
|
||||
assert result == 'test result'
|
||||
mock_session.prompt.assert_called_once()
|
||||
call_args = mock_session.prompt.call_args
|
||||
assert isinstance(call_args[0][0], HTML)
|
||||
|
||||
@@ -58,6 +58,11 @@ def test_parse_raises_error_for_empty_command() -> None:
|
||||
with pytest.raises(EmptyInputCommandException):
|
||||
InputCommand.parse('')
|
||||
|
||||
|
||||
def test_parse_raises_error_slash_on_the_end() -> None:
|
||||
with pytest.raises(UnprocessedInputFlagException):
|
||||
InputCommand.parse('ssh --host 192.168.0.3\\')
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Tests for flag validation - valid flags
|
||||
|
||||
@@ -164,6 +164,49 @@ def test_input_flags_get_by_name_returns_none_for_missing_flag() -> None:
|
||||
assert input_flags.get_flag_by_name('case') is None
|
||||
|
||||
|
||||
def test_input_flags_get_by_name_with_status_finds_matching_flag() -> None:
|
||||
from argenta.command.flag import ValidationStatus
|
||||
|
||||
flag1 = InputFlag(name='test', input_value='valid', status=ValidationStatus.VALID)
|
||||
flag2 = InputFlag(name='other', input_value='invalid', status=ValidationStatus.INVALID)
|
||||
input_flags = InputFlags([flag1, flag2])
|
||||
|
||||
result = input_flags.get_flag_by_name('test', with_status=ValidationStatus.VALID)
|
||||
assert result == flag1
|
||||
|
||||
|
||||
def test_input_flags_get_by_name_with_status_returns_none_when_status_mismatch() -> None:
|
||||
from argenta.command.flag import ValidationStatus
|
||||
|
||||
flag = InputFlag(name='test', input_value='value', status=ValidationStatus.VALID)
|
||||
input_flags = InputFlags([flag])
|
||||
|
||||
result = input_flags.get_flag_by_name('test', with_status=ValidationStatus.INVALID)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_input_flags_get_by_name_with_status_returns_default_when_not_found() -> None:
|
||||
from argenta.command.flag import ValidationStatus
|
||||
|
||||
flag = InputFlag(name='test', input_value='value', status=ValidationStatus.VALID)
|
||||
input_flags = InputFlags([flag])
|
||||
|
||||
result = input_flags.get_flag_by_name('missing', with_status=ValidationStatus.VALID, default='default_value')
|
||||
assert result == 'default_value'
|
||||
|
||||
|
||||
def test_input_flags_get_by_name_with_status_filters_by_both_name_and_status() -> None:
|
||||
from argenta.command.flag import ValidationStatus
|
||||
|
||||
flag1 = InputFlag(name='test', input_value='value1', status=ValidationStatus.VALID)
|
||||
flag2 = InputFlag(name='test', input_value='value2', status=ValidationStatus.INVALID)
|
||||
flag3 = InputFlag(name='other', input_value='value3', status=ValidationStatus.VALID)
|
||||
input_flags = InputFlags([flag1, flag2, flag3])
|
||||
|
||||
result = input_flags.get_flag_by_name('test', with_status=ValidationStatus.INVALID)
|
||||
assert result == flag2
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Tests for InputFlags collection - equality and containment
|
||||
# ============================================================================
|
||||
|
||||
@@ -248,6 +248,17 @@ def test_finds_appropriate_handler_executes_handler_with_flags_by_alias(capsys:
|
||||
assert "Hello World!" in output.out
|
||||
|
||||
|
||||
def test_finds_appropriate_handler_raises_runtime_error_when_handler_not_found() -> None:
|
||||
router = Router()
|
||||
|
||||
@router.command('hello')
|
||||
def handler(_res: Response) -> None:
|
||||
pass
|
||||
|
||||
with pytest.raises(RuntimeError, match="Handler for 'unknown' command not found. Panic!"):
|
||||
router.finds_appropriate_handler(InputCommand('unknown'))
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Tests for alias and trigger collision detection
|
||||
# ============================================================================
|
||||
|
||||
@@ -141,3 +141,15 @@ class TestViewer:
|
||||
|
||||
mock_output_generator.assert_called_once()
|
||||
assert mock_printer.call_count >= 2
|
||||
|
||||
def test_view_framed_text_with_unimplemented_dividing_line(self, mock_printer: Mock, mock_output_generator: Mock):
|
||||
class NotImplementedDividingLine:
|
||||
pass
|
||||
|
||||
renderer = PlainRenderer()
|
||||
dividing_line = NotImplementedDividingLine()
|
||||
viewer = Viewer(mock_printer, renderer, dividing_line, False)
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
viewer.view_framed_text_from_generator(mock_output_generator, is_stdout_redirected_by_router=True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user