From b9b83540e25c9700f50d4ce9cf411284b656e07d Mon Sep 17 00:00:00 2001 From: kolo Date: Fri, 13 Mar 2026 16:50:38 +0300 Subject: [PATCH] perf boooooooooooooost --- .gitignore | 3 +- .../infrastructure/entrypoint_resolver.py | 31 +- .../app/autocompleter/_ext_features_impl.py | 122 ++++++++ src/argenta/app/autocompleter/entity.py | 136 ++------- src/argenta/app/models.py | 7 +- src/argenta/orchestrator/entity.py | 4 +- src/argenta/response/entity.py | 7 +- src/argenta/router/entity.py | 5 +- tests/unit_tests/test_autocompleter.py | 278 ++++++++---------- 9 files changed, 297 insertions(+), 296 deletions(-) create mode 100644 src/argenta/app/autocompleter/_ext_features_impl.py diff --git a/.gitignore b/.gitignore index 46af4f2..557f006 100644 --- a/.gitignore +++ b/.gitignore @@ -321,4 +321,5 @@ http-client.private.env.json .idea/.cache/.Apifox_Helper .idea/ApifoxUploaderProjectSetting.xml -.zed \ No newline at end of file +.zed +test.py \ No newline at end of file diff --git a/src/argenta/_cli/infrastructure/entrypoint_resolver.py b/src/argenta/_cli/infrastructure/entrypoint_resolver.py index 24a9f13..42d2f67 100644 --- a/src/argenta/_cli/infrastructure/entrypoint_resolver.py +++ b/src/argenta/_cli/infrastructure/entrypoint_resolver.py @@ -52,24 +52,30 @@ class EntrypointResolver: @overload def parse_entrypoint_with_type( - self, entrypoint_type: type[CallableEntryPoint] + self, + entrypoint_object_name: str, + entrypoint_type: type[CallableEntryPoint] ) -> EntryPoint[Callable[[], None]]: ... @overload def parse_entrypoint_with_type( - self, entrypoint_type: type[EntryPointAsApp] + self, + entrypoint_object_name: str, + entrypoint_type: type[EntryPointAsApp] ) -> EntryPoint[App]: ... def parse_entrypoint_with_type( - self, entrypoint_type: type[CallableEntryPoint] | type[EntryPointAsApp] + self, + entrypoint_object_name: str, + entrypoint_type: type[CallableEntryPoint] | type[EntryPointAsApp] ) -> EntryPoint[Callable[[], None]] | EntryPoint[App]: if entrypoint_type is CallableEntryPoint: - return self._parse_callable_entrypoint() + return self._parse_callable_entrypoint(entrypoint_object_name) elif entrypoint_type is EntryPointAsApp: - return self._parse_entrypoint_as_app() + return self._parse_entrypoint_as_app(entrypoint_object_name) raise NotImplementedError - def _parse_callable_entrypoint(self) -> CallableEntryPoint: - resolved_entrypoint = self._resolve_from_string() + def _parse_callable_entrypoint(self, entrypoint_object_name: str) -> CallableEntryPoint: + resolved_entrypoint = self._resolve_from_string(entrypoint_object_name) instance_object = resolved_entrypoint[1] if not callable(instance_object): raise EntrypointNotCallableError(repr(instance_object)) @@ -82,20 +88,21 @@ class EntrypointResolver: instance_object = cast(Callable[[], None], instance_object) return CallableEntryPoint(raw_path=resolved_entrypoint[0], instance_object=instance_object) - def _parse_entrypoint_as_app(self) -> EntryPointAsApp: - resolved_entrypoint = self._resolve_from_string() + def _parse_entrypoint_as_app(self, entrypoint_object_name: str) -> EntryPointAsApp: + resolved_entrypoint = self._resolve_from_string(entrypoint_object_name) instance_object = resolved_entrypoint[1] if not isinstance(instance_object, App): raise EntrypointNotAppInstanceError(repr(instance_object)) return EntryPointAsApp(raw_path=resolved_entrypoint[0], instance_object=instance_object) - def _resolve_from_string(self) -> tuple[str, object]: - file_path, _, attr_name = self._path_to_entrypoint.partition(":") + def _resolve_from_string(self, entrypoint_object_name: str) -> tuple[str, object]: + file_path: str = self._path_to_entrypoint + attr_name: str = entrypoint_object_name if not file_path or not attr_name: raise ResolveFromStringError( - f'"{self._path_to_entrypoint}" must be in format ":"' + f'"{self._path_to_entrypoint}" must be in format ""' ) path = Path(file_path).resolve() diff --git a/src/argenta/app/autocompleter/_ext_features_impl.py b/src/argenta/app/autocompleter/_ext_features_impl.py new file mode 100644 index 0000000..d5d5317 --- /dev/null +++ b/src/argenta/app/autocompleter/_ext_features_impl.py @@ -0,0 +1,122 @@ +__all__ = ['build_session', 'do_prompt'] + +from typing import Callable, Iterable + +from prompt_toolkit import HTML, PromptSession +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.completion import CompleteEvent, Completer, Completion, ThreadedCompleter +from prompt_toolkit.cursor_shapes import CursorShape +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import StyleAndTextTuples +from prompt_toolkit.history import FileHistory, History, InMemoryHistory, ThreadedHistory +from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent +from prompt_toolkit.lexers import Lexer +from prompt_toolkit.styles import Style + + +class CommandLexer(Lexer): + def __init__(self, valid_commands: set[str]) -> None: + self.valid_commands: set[str] = valid_commands + + def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]: + def get_line_tokens(lineno: int) -> StyleAndTextTuples: + if lineno >= len(document.lines): + return [] + + line_text: str = document.lines[lineno] + + if not line_text.strip(): + return [("", line_text)] + + first_word: str = line_text.split()[0] if line_text.split() else "" + + if first_word in self.valid_commands: + return [("class:valid", line_text)] + else: + return [("class:invalid", line_text)] + + return get_line_tokens + + +class HistoryCompleter(Completer): + def __init__(self, history_container: History, static_commands: set[str]) -> None: + self.history_container: History = history_container + self.static_commands: set[str] = static_commands + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + text: str = document.text_before_cursor + history_items: set[str] = set(self.history_container.load_history_strings()) + all_candidates: set[str] = history_items.union(self.static_commands) + matches: list[str] = sorted(cmd for cmd in all_candidates if cmd.startswith(text)) + + if not matches: + return + + for match in matches: + yield Completion(match, start_position=-len(text), display=match) + + @staticmethod + def _find_common_prefix(matches: list[str]) -> str: + if not matches: + return "" + common: str = matches[0] + for match in matches[1:]: + i: int = 0 + while i < len(common) and i < len(match) and common[i] == match[i]: + i += 1 + common = common[:i] + return common + + +def build_session( + history_filename: str | None, + autocomplete_button: str, + command_highlighting: bool, + auto_suggestions: bool, + all_commands: set[str], +) -> PromptSession[str]: + kb = KeyBindings() + + def _(event: KeyPressEvent) -> None: + buff = event.app.current_buffer + if buff.complete_state: + buff.complete_next() + return + comps_gen = iter(buff.completer.get_completions(buff.document, CompleteEvent())) + try: + first = next(comps_gen) + except StopIteration: + return + try: + _ = next(comps_gen) + buff.start_completion(select_first=False) + except StopIteration: + buff.apply_completion(first) + + kb.add(autocomplete_button)(_) + + history: InMemoryHistory | ThreadedHistory + if history_filename: + history = ThreadedHistory(FileHistory(history_filename)) + else: + history = InMemoryHistory() + + style = Style.from_dict({"valid": "#00ff00", "invalid": "#ff0000"}) + return PromptSession( + history=history, + completer=ThreadedCompleter(HistoryCompleter(history, all_commands)), + complete_while_typing=False, + key_bindings=kb, + auto_suggest=AutoSuggestFromHistory() if auto_suggestions else None, + style=style if command_highlighting else None, + lexer=CommandLexer(all_commands) if command_highlighting else None, + ) + + +def do_prompt(session: PromptSession[str], prompt_text: str | HTML) -> str: + return session.prompt( + HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text, + cursor=CursorShape.BLINKING_BEAM, + ) diff --git a/src/argenta/app/autocompleter/entity.py b/src/argenta/app/autocompleter/entity.py index 8b0704d..6151f34 100644 --- a/src/argenta/app/autocompleter/entity.py +++ b/src/argenta/app/autocompleter/entity.py @@ -1,86 +1,21 @@ +from __future__ import annotations + __all__ = ["AutoCompleter"] import sys -from typing import Callable, Iterable +from typing import TYPE_CHECKING -from prompt_toolkit import HTML, PromptSession -from prompt_toolkit.auto_suggest import AutoSuggestFromHistory -from prompt_toolkit.completion import (CompleteEvent, Completer, Completion, - ThreadedCompleter) -from prompt_toolkit.cursor_shapes import CursorShape -from prompt_toolkit.document import Document -from prompt_toolkit.formatted_text import StyleAndTextTuples -from prompt_toolkit.history import FileHistory, History, InMemoryHistory, ThreadedHistory -from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent -from prompt_toolkit.lexers import Lexer -from prompt_toolkit.styles import Style - - -class CommandLexer(Lexer): - def __init__(self, valid_commands: set[str]) -> None: - self.valid_commands: set[str] = valid_commands - - def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]: - def get_line_tokens(lineno: int) -> StyleAndTextTuples: - if lineno >= len(document.lines): - return [] - - line_text: str = document.lines[lineno] - - if not line_text.strip(): - return [("", line_text)] - - first_word: str = line_text.split()[0] if line_text.split() else "" - - if first_word in self.valid_commands: - return [("class:valid", line_text)] - else: - return [("class:invalid", line_text)] - - return get_line_tokens - - -class HistoryCompleter(Completer): - def __init__(self, history_container: History, static_commands: set[str]) -> None: - self.history_container: History = history_container - self.static_commands: set[str] = static_commands - - def get_completions(self, document: Document, complete_event: CompleteEvent) -> Iterable[Completion]: - text: str = document.text_before_cursor - history_items: set[str] = set(self.history_container.load_history_strings()) - all_candidates: set[str] = history_items.union(self.static_commands) - matches: list[str] = sorted(cmd for cmd in all_candidates if cmd.startswith(text)) - - if not matches: - return - - for match in matches: - yield Completion( - match, - start_position=-len(text), - display=match - ) - - @staticmethod - def _find_common_prefix(matches: list[str]) -> str: - if not matches: - return "" - common: str = matches[0] - for match in matches[1:]: - i: int = 0 - while i < len(common) and i < len(match) and common[i] == match[i]: - i += 1 - common = common[:i] - return common +if TYPE_CHECKING: + from prompt_toolkit import PromptSession, HTML class AutoCompleter: def __init__( - self, - history_filename: str | None = None, - autocomplete_button: str = "tab", - command_highlighting: bool = True, - auto_suggestions: bool = True, + self, + history_filename: str | None = None, + autocomplete_button: str = "tab", + command_highlighting: bool = True, + auto_suggestions: bool = True, ) -> None: self.history_filename: str | None = history_filename self.autocomplete_button: str = autocomplete_button @@ -94,42 +29,15 @@ class AutoCompleter: self._session = None self._fallback_mode = True return + + from ._ext_features_impl import build_session - kb = KeyBindings() - - def _(event: KeyPressEvent) -> None: - buff = event.app.current_buffer - if buff.complete_state: - buff.complete_next() - return - comps_gen = iter(buff.completer.get_completions(buff.document, CompleteEvent())) - try: - first = next(comps_gen) - except StopIteration: - return - try: - _ = next(comps_gen) - buff.start_completion(select_first=False) - except StopIteration: - buff.apply_completion(first) - - kb.add(self.autocomplete_button)(_) - - history: InMemoryHistory | ThreadedHistory - if self.history_filename: - history = ThreadedHistory(FileHistory(self.history_filename)) - else: - history = InMemoryHistory() - - style = Style.from_dict({'valid': '#00ff00', 'invalid': '#ff0000'}) - self._session = PromptSession( - history=history, - completer=ThreadedCompleter(HistoryCompleter(history, all_commands)), - complete_while_typing=False, - key_bindings=kb, - auto_suggest=AutoSuggestFromHistory() if self.auto_suggestions else None, - style=style if self.command_highlighting else None, - lexer=CommandLexer(all_commands) if self.command_highlighting else None, + self._session = build_session( + self.history_filename, + self.autocomplete_button, + self.command_highlighting, + self.auto_suggestions, + all_commands ) def prompt(self, prompt_text: str | HTML = ">>> ") -> str: @@ -137,7 +45,7 @@ class AutoCompleter: return input(prompt_text if isinstance(prompt_text, str) else ">>> ") if self._session is None: raise RuntimeError("Call initial_setup() before using prompt()") - return self._session.prompt( - HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text, - cursor=CursorShape.BLINKING_BEAM - ) \ No newline at end of file + + from ._ext_features_impl import do_prompt + + return do_prompt(self._session, prompt_text) diff --git a/src/argenta/app/models.py b/src/argenta/app/models.py index 5d163f3..bf0eeaf 100644 --- a/src/argenta/app/models.py +++ b/src/argenta/app/models.py @@ -3,8 +3,6 @@ __all__ = ["App"] import difflib from typing import Never, TypeAlias -from rich.console import Console - from argenta.app.autocompleter import AutoCompleter from argenta.app.behavior_handlers.models import (BehaviorHandlersFabric, BehaviorHandlersSettersMixin) @@ -189,7 +187,7 @@ class App(BaseApp): repeat_command_groups_printing: bool = False, override_system_messages: bool = False, autocompleter: AutoCompleter | None = None, - printer: Printer = Console().print, + printer: Printer | None = None, ) -> None: """ Public. The essence of the application itself. @@ -206,6 +204,7 @@ class App(BaseApp): :param printer: system messages text output function :return: None """ + from rich.console import Console super().__init__( prompt=prompt, initial_message=initial_message, @@ -216,7 +215,7 @@ class App(BaseApp): repeat_command_groups_printing=repeat_command_groups_printing, override_system_messages=override_system_messages, autocompleter=autocompleter or AutoCompleter(), - printer=printer, + printer=printer or Console().print, ) def include_router(self, router: Router) -> None: diff --git a/src/argenta/orchestrator/entity.py b/src/argenta/orchestrator/entity.py index 39175e2..13d16af 100644 --- a/src/argenta/orchestrator/entity.py +++ b/src/argenta/orchestrator/entity.py @@ -28,7 +28,7 @@ class Orchestrator: self._auto_inject_handlers: bool = auto_inject_handlers if self._arg_parser is not None: - self._arg_parser._parse_args() + self._arg_parser._parse_args() # pyright: ignore[reportPrivateUsage] def run_repl(self, app: App) -> None: """ @@ -41,4 +41,4 @@ class Orchestrator: ) setup_dishka(app, container, auto_inject=self._auto_inject_handlers) - app._run_repl() + app._run_repl() # pyright: ignore[reportPrivateUsage] diff --git a/src/argenta/response/entity.py b/src/argenta/response/entity.py index 09a0296..df54f4d 100644 --- a/src/argenta/response/entity.py +++ b/src/argenta/response/entity.py @@ -1,6 +1,11 @@ +from __future__ import annotations + __all__ = ["Response"] -from dishka import Container +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from dishka import Container from argenta.command import InputFlags from argenta.response.status import ResponseStatus diff --git a/src/argenta/router/entity.py b/src/argenta/router/entity.py index 9a6de00..9ca2d46 100644 --- a/src/argenta/router/entity.py +++ b/src/argenta/router/entity.py @@ -3,8 +3,6 @@ __all__ = ["Router"] from inspect import get_annotations, getfullargspec, getsourcefile, getsourcelines from typing import Callable -from rich.console import Console - from argenta.app.protocols import HandlerFunc from argenta.command import Command, InputCommand, InputFlags from argenta.command.flag import ValidationStatus @@ -20,7 +18,7 @@ from argenta.router.exceptions import (RepeatedAliasNameException, class Router: def __init__( self, - title: str = "Default title", + title: str = "Title", *, disable_redirect_stdout: bool = False, ): @@ -175,6 +173,7 @@ class Router: response_arg_annotation = func_annotations.get(response_arg) if response_arg_annotation is not None and response_arg_annotation is not Response: + from rich.console import Console source_line: int = getsourcelines(func)[1] Console().print( f'\nFile "{getsourcefile(func)}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint ' diff --git a/tests/unit_tests/test_autocompleter.py b/tests/unit_tests/test_autocompleter.py index c900a2c..f82dd2f 100644 --- a/tests/unit_tests/test_autocompleter.py +++ b/tests/unit_tests/test_autocompleter.py @@ -10,14 +10,11 @@ from prompt_toolkit.completion import CompleteEvent from prompt_toolkit.document import Document from prompt_toolkit.history import InMemoryHistory -from argenta.app.autocompleter.entity import ( - AutoCompleter, - CommandLexer, - HistoryCompleter -) - +from argenta.app.autocompleter._ext_features_impl import CommandLexer, HistoryCompleter +from argenta.app.autocompleter.entity import AutoCompleter COMMANDS: set[str] = {"start", "stop", "status"} +_IMPL = "argenta.app.autocompleter._ext_features_impl" def test_autocompleter_initializes_with_default_params() -> None: @@ -33,7 +30,7 @@ def test_autocompleter_initializes_with_custom_params() -> None: history_filename="test.txt", autocomplete_button="c-space", command_highlighting=False, - auto_suggestions=False + auto_suggestions=False, ) assert completer.history_filename == "test.txt" assert completer.autocomplete_button == "c-space" @@ -79,13 +76,13 @@ def test_history_completer_returns_matching_commands() -> None: history = InMemoryHistory() history.append_string("start server") history.append_string("stop server") - + completer = HistoryCompleter(history, {"status"}) doc = Document("sta") - + completions = list(completer.get_completions(doc, CompleteEvent())) completion_texts = [c.text for c in completions] - + assert "start server" in completion_texts assert "status" in completion_texts assert "stop server" not in completion_texts @@ -95,13 +92,13 @@ def test_history_completer_returns_all_when_empty_input() -> None: history = InMemoryHistory() history.append_string("start") history.append_string("stop") - + completer = HistoryCompleter(history, {"status"}) doc = Document("") - + completions = list(completer.get_completions(doc, CompleteEvent())) completion_texts = [c.text for c in completions] - + assert len(completion_texts) == 3 assert "start" in completion_texts assert "stop" in completion_texts @@ -111,10 +108,10 @@ def test_history_completer_returns_all_when_empty_input() -> None: def test_history_completer_returns_empty_when_no_matches() -> None: history = InMemoryHistory() history.append_string("start") - + completer = HistoryCompleter(history, {"stop"}) doc = Document("xyz") - + completions = list(completer.get_completions(doc, CompleteEvent())) assert len(completions) == 0 @@ -123,10 +120,10 @@ def test_history_completer_deduplicates_commands() -> None: history = InMemoryHistory() history.append_string("start") history.append_string("start") - + completer = HistoryCompleter(history, {"start"}) doc = Document("sta") - + completions = list(completer.get_completions(doc, CompleteEvent())) assert len(completions) == 1 @@ -136,13 +133,13 @@ def test_history_completer_sorts_results() -> None: history.append_string("stop") history.append_string("start") history.append_string("status") - + completer = HistoryCompleter(history, set()) doc = Document("st") - + completions = list(completer.get_completions(doc, CompleteEvent())) completion_texts = [c.text for c in completions] - + assert completion_texts == ["start", "status", "stop"] @@ -182,7 +179,7 @@ def test_history_completer_returns_early_when_no_matches() -> None: history = InMemoryHistory() completer = HistoryCompleter(history, {"start", "stop"}) doc = Document("xyz") - + result = completer.get_completions(doc, CompleteEvent()) completions = list(result) assert completions == [] @@ -190,28 +187,32 @@ def test_history_completer_returns_early_when_no_matches() -> None: def test_autocompleter_initial_setup_with_commands() -> None: completer = AutoCompleter() - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession') as mock_session: + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession") as mock_session, + ): completer.initial_setup({"start", "stop", "status"}) - + assert completer._session is not None assert completer._fallback_mode is False mock_session.assert_called_once() def test_autocompleter_initial_setup_with_history_file() -> None: - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: history_file = f.name - + try: completer = AutoCompleter(history_filename=history_file) - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.ThreadedHistory') as mock_threaded_history: + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession"), + patch(f"{_IMPL}.ThreadedHistory") as mock_threaded_history, + ): completer.initial_setup({"start", "stop"}) - + assert completer._session is not None assert completer._fallback_mode is False mock_threaded_history.assert_called_once() @@ -222,12 +223,14 @@ def test_autocompleter_initial_setup_with_history_file() -> None: def test_autocompleter_initial_setup_without_history_file() -> None: completer = AutoCompleter(history_filename=None) - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.InMemoryHistory') as mock_in_memory: + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession"), + patch(f"{_IMPL}.InMemoryHistory") as mock_in_memory, + ): completer.initial_setup({"start", "stop"}) - + assert completer._session is not None assert completer._fallback_mode is False mock_in_memory.assert_called_once() @@ -235,96 +238,90 @@ def test_autocompleter_initial_setup_without_history_file() -> None: def test_autocompleter_initial_setup_with_custom_autocomplete_button() -> None: completer = AutoCompleter(autocomplete_button="c-space") - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'): + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession"), + ): completer.initial_setup({"start", "stop"}) - + assert completer._session is not None assert completer.autocomplete_button == "c-space" def test_autocompleter_initial_setup_without_auto_suggestions() -> None: completer = AutoCompleter(auto_suggestions=False) - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession') as mock_session: + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession") as mock_session, + ): completer.initial_setup({"start", "stop"}) - + assert completer._session is not None call_kwargs = mock_session.call_args[1] - assert call_kwargs['auto_suggest'] is None + assert call_kwargs["auto_suggest"] is None def test_autocompleter_initial_setup_without_command_highlighting() -> None: completer = AutoCompleter(command_highlighting=False) - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession') as mock_session: + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession") as mock_session, + ): completer.initial_setup({"start", "stop"}) - + assert completer._session is not None call_kwargs = mock_session.call_args[1] - assert call_kwargs['style'] is None - assert call_kwargs['lexer'] is None + assert call_kwargs["style"] is None + assert call_kwargs["lexer"] is None -def test_autocompleter_key_binding_handler_with_complete_state() -> None: - completer = AutoCompleter() - +def _setup_captured_handler(completer: AutoCompleter) -> Callable[[Any], None] | None: + """Вспомогательная функция: поднимает initial_setup и захватывает kb-хендлер.""" captured_handler: Callable[[Any], None] | None = None - + def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]: def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]: nonlocal captured_handler captured_handler = func return func return decorator - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class: - + + with ( + patch.object(sys.stdin, "isatty", return_value=True), + patch(f"{_IMPL}.PromptSession"), + patch(f"{_IMPL}.KeyBindings") as mock_kb_class, + ): mock_kb = MagicMock() mock_kb.add = capture_kb_add mock_kb_class.return_value = mock_kb - completer.initial_setup({"start", "stop"}) - + + return captured_handler + + +def test_autocompleter_key_binding_handler_with_complete_state() -> None: + completer = AutoCompleter() + captured_handler = _setup_captured_handler(completer) assert captured_handler is not None - + mock_event = MagicMock() mock_buff = MagicMock() mock_buff.complete_state = True mock_event.app.current_buffer = mock_buff - + captured_handler(mock_event) - + mock_buff.complete_next.assert_called_once() def test_autocompleter_key_binding_handler_no_completions() -> None: completer = AutoCompleter() - - captured_handler: Callable[[Any], None] | None = None - - def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]: - def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]: - nonlocal captured_handler - captured_handler = func - return func - return decorator - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class: - - mock_kb = MagicMock() - mock_kb.add = capture_kb_add - mock_kb_class.return_value = mock_kb - - completer.initial_setup({"start", "stop"}) - + captured_handler = _setup_captured_handler(completer) + assert captured_handler is not None + mock_event = MagicMock() mock_buff = MagicMock() mock_buff.complete_state = False @@ -332,36 +329,18 @@ def test_autocompleter_key_binding_handler_no_completions() -> None: mock_completer.get_completions.return_value = iter([]) mock_buff.completer = mock_completer mock_event.app.current_buffer = mock_buff - - assert captured_handler is not None + captured_handler(mock_event) - + mock_buff.start_completion.assert_not_called() mock_buff.apply_completion.assert_not_called() def test_autocompleter_key_binding_handler_single_completion() -> None: completer = AutoCompleter() - - captured_handler: Callable[[Any], None] | None = None - - def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]: - def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]: - nonlocal captured_handler - captured_handler = func - return func - return decorator - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class: - - mock_kb = MagicMock() - mock_kb.add = capture_kb_add - mock_kb_class.return_value = mock_kb - - completer.initial_setup({"start", "stop"}) - + captured_handler = _setup_captured_handler(completer) + assert captured_handler is not None + mock_event = MagicMock() mock_buff = MagicMock() mock_buff.complete_state = False @@ -370,36 +349,18 @@ def test_autocompleter_key_binding_handler_single_completion() -> None: mock_completer.get_completions.return_value = iter([mock_completion]) mock_buff.completer = mock_completer mock_event.app.current_buffer = mock_buff - - assert captured_handler is not None + captured_handler(mock_event) - + mock_buff.apply_completion.assert_called_once_with(mock_completion) mock_buff.start_completion.assert_not_called() def test_autocompleter_key_binding_handler_multiple_completions() -> None: completer = AutoCompleter() - - captured_handler: Callable[[Any], None] | None = None - - def capture_kb_add(key: str) -> Callable[[Callable[[Any], None]], Callable[[Any], None]]: - def decorator(func: Callable[[Any], None]) -> Callable[[Any], None]: - nonlocal captured_handler - captured_handler = func - return func - return decorator - - with patch.object(sys.stdin, 'isatty', return_value=True), \ - patch('argenta.app.autocompleter.entity.PromptSession'), \ - patch('argenta.app.autocompleter.entity.KeyBindings') as mock_kb_class: - - mock_kb = MagicMock() - mock_kb.add = capture_kb_add - mock_kb_class.return_value = mock_kb - - completer.initial_setup({"start", "stop"}) - + captured_handler = _setup_captured_handler(completer) + assert captured_handler is not None + mock_event = MagicMock() mock_buff = MagicMock() mock_buff.complete_state = False @@ -409,54 +370,53 @@ def test_autocompleter_key_binding_handler_multiple_completions() -> None: mock_completer.get_completions.return_value = iter([mock_completion1, mock_completion2]) mock_buff.completer = mock_completer mock_event.app.current_buffer = mock_buff - - assert captured_handler is not None + captured_handler(mock_event) - + mock_buff.start_completion.assert_called_once_with(select_first=False) mock_buff.apply_completion.assert_not_called() def test_autocompleter_prompt_in_fallback_mode_with_string() -> None: completer = AutoCompleter() - - with patch.object(sys.stdin, 'isatty', return_value=False): + + with patch.object(sys.stdin, "isatty", return_value=False): completer.initial_setup({"start", "stop"}) - + assert completer._fallback_mode is True - - with patch('builtins.input', return_value='test input'): + + with patch("builtins.input", return_value="test input"): result = completer.prompt(">>> ") - - assert result == 'test input' + + assert result == "test input" def test_autocompleter_prompt_in_fallback_mode_with_html() -> None: completer = AutoCompleter() - - with patch.object(sys.stdin, 'isatty', return_value=False): + + with patch.object(sys.stdin, "isatty", return_value=False): completer.initial_setup({"start", "stop"}) - + assert completer._fallback_mode is True - - with patch('builtins.input', return_value='test input'): + + with patch("builtins.input", return_value="test input"): result = completer.prompt(HTML(">>> ")) - - assert result == 'test input' + + assert result == "test input" def test_autocompleter_prompt_with_html_in_normal_mode() -> None: completer = AutoCompleter() - + mock_session = MagicMock() - mock_session.prompt.return_value = 'test result' + mock_session.prompt.return_value = "test result" completer._session = mock_session completer._fallback_mode = False - + html_prompt = HTML(">>> ") result = completer.prompt(html_prompt) - - assert result == 'test result' + + assert result == "test result" mock_session.prompt.assert_called_once() call_args = mock_session.prompt.call_args assert call_args[0][0] == html_prompt @@ -464,15 +424,15 @@ def test_autocompleter_prompt_with_html_in_normal_mode() -> None: def test_autocompleter_prompt_with_string_in_normal_mode() -> None: completer = AutoCompleter() - + mock_session = MagicMock() - mock_session.prompt.return_value = 'test result' + mock_session.prompt.return_value = "test result" completer._session = mock_session completer._fallback_mode = False - + result = completer.prompt(">>> ") - - assert result == 'test result' + + assert result == "test result" mock_session.prompt.assert_called_once() call_args = mock_session.prompt.call_args assert isinstance(call_args[0][0], HTML)