perf boooooooooooooost

This commit is contained in:
2026-03-13 16:50:38 +03:00
parent 1cd5c3759e
commit b9b83540e2
9 changed files with 297 additions and 296 deletions
@@ -52,24 +52,30 @@ class EntrypointResolver:
@overload
def parse_entrypoint_with_type(
self, entrypoint_type: type[CallableEntryPoint]
self,
entrypoint_object_name: str,
entrypoint_type: type[CallableEntryPoint]
) -> EntryPoint[Callable[[], None]]: ...
@overload
def parse_entrypoint_with_type(
self, entrypoint_type: type[EntryPointAsApp]
self,
entrypoint_object_name: str,
entrypoint_type: type[EntryPointAsApp]
) -> EntryPoint[App]: ...
def parse_entrypoint_with_type(
self, entrypoint_type: type[CallableEntryPoint] | type[EntryPointAsApp]
self,
entrypoint_object_name: str,
entrypoint_type: type[CallableEntryPoint] | type[EntryPointAsApp]
) -> EntryPoint[Callable[[], None]] | EntryPoint[App]:
if entrypoint_type is CallableEntryPoint:
return self._parse_callable_entrypoint()
return self._parse_callable_entrypoint(entrypoint_object_name)
elif entrypoint_type is EntryPointAsApp:
return self._parse_entrypoint_as_app()
return self._parse_entrypoint_as_app(entrypoint_object_name)
raise NotImplementedError
def _parse_callable_entrypoint(self) -> CallableEntryPoint:
resolved_entrypoint = self._resolve_from_string()
def _parse_callable_entrypoint(self, entrypoint_object_name: str) -> CallableEntryPoint:
resolved_entrypoint = self._resolve_from_string(entrypoint_object_name)
instance_object = resolved_entrypoint[1]
if not callable(instance_object):
raise EntrypointNotCallableError(repr(instance_object))
@@ -82,20 +88,21 @@ class EntrypointResolver:
instance_object = cast(Callable[[], None], instance_object)
return CallableEntryPoint(raw_path=resolved_entrypoint[0], instance_object=instance_object)
def _parse_entrypoint_as_app(self) -> EntryPointAsApp:
resolved_entrypoint = self._resolve_from_string()
def _parse_entrypoint_as_app(self, entrypoint_object_name: str) -> EntryPointAsApp:
resolved_entrypoint = self._resolve_from_string(entrypoint_object_name)
instance_object = resolved_entrypoint[1]
if not isinstance(instance_object, App):
raise EntrypointNotAppInstanceError(repr(instance_object))
return EntryPointAsApp(raw_path=resolved_entrypoint[0], instance_object=instance_object)
def _resolve_from_string(self) -> tuple[str, object]:
file_path, _, attr_name = self._path_to_entrypoint.partition(":")
def _resolve_from_string(self, entrypoint_object_name: str) -> tuple[str, object]:
file_path: str = self._path_to_entrypoint
attr_name: str = entrypoint_object_name
if not file_path or not attr_name:
raise ResolveFromStringError(
f'"{self._path_to_entrypoint}" must be in format "<path/to/file.py>:<attribute>"'
f'"{self._path_to_entrypoint}" must be in format "<path/to/file.py>"'
)
path = Path(file_path).resolve()
@@ -0,0 +1,122 @@
__all__ = ['build_session', 'do_prompt']
from typing import Callable, Iterable
from prompt_toolkit import HTML, PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import CompleteEvent, Completer, Completion, ThreadedCompleter
from prompt_toolkit.cursor_shapes import CursorShape
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from prompt_toolkit.history import FileHistory, History, InMemoryHistory, ThreadedHistory
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
class CommandLexer(Lexer):
def __init__(self, valid_commands: set[str]) -> None:
self.valid_commands: set[str] = valid_commands
def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]:
def get_line_tokens(lineno: int) -> StyleAndTextTuples:
if lineno >= len(document.lines):
return []
line_text: str = document.lines[lineno]
if not line_text.strip():
return [("", line_text)]
first_word: str = line_text.split()[0] if line_text.split() else ""
if first_word in self.valid_commands:
return [("class:valid", line_text)]
else:
return [("class:invalid", line_text)]
return get_line_tokens
class HistoryCompleter(Completer):
def __init__(self, history_container: History, static_commands: set[str]) -> None:
self.history_container: History = history_container
self.static_commands: set[str] = static_commands
def get_completions(
self, document: Document, complete_event: CompleteEvent
) -> Iterable[Completion]:
text: str = document.text_before_cursor
history_items: set[str] = set(self.history_container.load_history_strings())
all_candidates: set[str] = history_items.union(self.static_commands)
matches: list[str] = sorted(cmd for cmd in all_candidates if cmd.startswith(text))
if not matches:
return
for match in matches:
yield Completion(match, start_position=-len(text), display=match)
@staticmethod
def _find_common_prefix(matches: list[str]) -> str:
if not matches:
return ""
common: str = matches[0]
for match in matches[1:]:
i: int = 0
while i < len(common) and i < len(match) and common[i] == match[i]:
i += 1
common = common[:i]
return common
def build_session(
history_filename: str | None,
autocomplete_button: str,
command_highlighting: bool,
auto_suggestions: bool,
all_commands: set[str],
) -> PromptSession[str]:
kb = KeyBindings()
def _(event: KeyPressEvent) -> None:
buff = event.app.current_buffer
if buff.complete_state:
buff.complete_next()
return
comps_gen = iter(buff.completer.get_completions(buff.document, CompleteEvent()))
try:
first = next(comps_gen)
except StopIteration:
return
try:
_ = next(comps_gen)
buff.start_completion(select_first=False)
except StopIteration:
buff.apply_completion(first)
kb.add(autocomplete_button)(_)
history: InMemoryHistory | ThreadedHistory
if history_filename:
history = ThreadedHistory(FileHistory(history_filename))
else:
history = InMemoryHistory()
style = Style.from_dict({"valid": "#00ff00", "invalid": "#ff0000"})
return PromptSession(
history=history,
completer=ThreadedCompleter(HistoryCompleter(history, all_commands)),
complete_while_typing=False,
key_bindings=kb,
auto_suggest=AutoSuggestFromHistory() if auto_suggestions else None,
style=style if command_highlighting else None,
lexer=CommandLexer(all_commands) if command_highlighting else None,
)
def do_prompt(session: PromptSession[str], prompt_text: str | HTML) -> str:
return session.prompt(
HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text,
cursor=CursorShape.BLINKING_BEAM,
)
+22 -114
View File
@@ -1,86 +1,21 @@
from __future__ import annotations
__all__ = ["AutoCompleter"]
import sys
from typing import Callable, Iterable
from typing import TYPE_CHECKING
from prompt_toolkit import HTML, PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import (CompleteEvent, Completer, Completion,
ThreadedCompleter)
from prompt_toolkit.cursor_shapes import CursorShape
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from prompt_toolkit.history import FileHistory, History, InMemoryHistory, ThreadedHistory
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
class CommandLexer(Lexer):
def __init__(self, valid_commands: set[str]) -> None:
self.valid_commands: set[str] = valid_commands
def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]:
def get_line_tokens(lineno: int) -> StyleAndTextTuples:
if lineno >= len(document.lines):
return []
line_text: str = document.lines[lineno]
if not line_text.strip():
return [("", line_text)]
first_word: str = line_text.split()[0] if line_text.split() else ""
if first_word in self.valid_commands:
return [("class:valid", line_text)]
else:
return [("class:invalid", line_text)]
return get_line_tokens
class HistoryCompleter(Completer):
def __init__(self, history_container: History, static_commands: set[str]) -> None:
self.history_container: History = history_container
self.static_commands: set[str] = static_commands
def get_completions(self, document: Document, complete_event: CompleteEvent) -> Iterable[Completion]:
text: str = document.text_before_cursor
history_items: set[str] = set(self.history_container.load_history_strings())
all_candidates: set[str] = history_items.union(self.static_commands)
matches: list[str] = sorted(cmd for cmd in all_candidates if cmd.startswith(text))
if not matches:
return
for match in matches:
yield Completion(
match,
start_position=-len(text),
display=match
)
@staticmethod
def _find_common_prefix(matches: list[str]) -> str:
if not matches:
return ""
common: str = matches[0]
for match in matches[1:]:
i: int = 0
while i < len(common) and i < len(match) and common[i] == match[i]:
i += 1
common = common[:i]
return common
if TYPE_CHECKING:
from prompt_toolkit import PromptSession, HTML
class AutoCompleter:
def __init__(
self,
history_filename: str | None = None,
autocomplete_button: str = "tab",
command_highlighting: bool = True,
auto_suggestions: bool = True,
self,
history_filename: str | None = None,
autocomplete_button: str = "tab",
command_highlighting: bool = True,
auto_suggestions: bool = True,
) -> None:
self.history_filename: str | None = history_filename
self.autocomplete_button: str = autocomplete_button
@@ -94,42 +29,15 @@ class AutoCompleter:
self._session = None
self._fallback_mode = True
return
from ._ext_features_impl import build_session
kb = KeyBindings()
def _(event: KeyPressEvent) -> None:
buff = event.app.current_buffer
if buff.complete_state:
buff.complete_next()
return
comps_gen = iter(buff.completer.get_completions(buff.document, CompleteEvent()))
try:
first = next(comps_gen)
except StopIteration:
return
try:
_ = next(comps_gen)
buff.start_completion(select_first=False)
except StopIteration:
buff.apply_completion(first)
kb.add(self.autocomplete_button)(_)
history: InMemoryHistory | ThreadedHistory
if self.history_filename:
history = ThreadedHistory(FileHistory(self.history_filename))
else:
history = InMemoryHistory()
style = Style.from_dict({'valid': '#00ff00', 'invalid': '#ff0000'})
self._session = PromptSession(
history=history,
completer=ThreadedCompleter(HistoryCompleter(history, all_commands)),
complete_while_typing=False,
key_bindings=kb,
auto_suggest=AutoSuggestFromHistory() if self.auto_suggestions else None,
style=style if self.command_highlighting else None,
lexer=CommandLexer(all_commands) if self.command_highlighting else None,
self._session = build_session(
self.history_filename,
self.autocomplete_button,
self.command_highlighting,
self.auto_suggestions,
all_commands
)
def prompt(self, prompt_text: str | HTML = ">>> ") -> str:
@@ -137,7 +45,7 @@ class AutoCompleter:
return input(prompt_text if isinstance(prompt_text, str) else ">>> ")
if self._session is None:
raise RuntimeError("Call initial_setup() before using prompt()")
return self._session.prompt(
HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text,
cursor=CursorShape.BLINKING_BEAM
)
from ._ext_features_impl import do_prompt
return do_prompt(self._session, prompt_text)
+3 -4
View File
@@ -3,8 +3,6 @@ __all__ = ["App"]
import difflib
from typing import Never, TypeAlias
from rich.console import Console
from argenta.app.autocompleter import AutoCompleter
from argenta.app.behavior_handlers.models import (BehaviorHandlersFabric,
BehaviorHandlersSettersMixin)
@@ -189,7 +187,7 @@ class App(BaseApp):
repeat_command_groups_printing: bool = False,
override_system_messages: bool = False,
autocompleter: AutoCompleter | None = None,
printer: Printer = Console().print,
printer: Printer | None = None,
) -> None:
"""
Public. The essence of the application itself.
@@ -206,6 +204,7 @@ class App(BaseApp):
:param printer: system messages text output function
:return: None
"""
from rich.console import Console
super().__init__(
prompt=prompt,
initial_message=initial_message,
@@ -216,7 +215,7 @@ class App(BaseApp):
repeat_command_groups_printing=repeat_command_groups_printing,
override_system_messages=override_system_messages,
autocompleter=autocompleter or AutoCompleter(),
printer=printer,
printer=printer or Console().print,
)
def include_router(self, router: Router) -> None:
+2 -2
View File
@@ -28,7 +28,7 @@ class Orchestrator:
self._auto_inject_handlers: bool = auto_inject_handlers
if self._arg_parser is not None:
self._arg_parser._parse_args()
self._arg_parser._parse_args() # pyright: ignore[reportPrivateUsage]
def run_repl(self, app: App) -> None:
"""
@@ -41,4 +41,4 @@ class Orchestrator:
)
setup_dishka(app, container, auto_inject=self._auto_inject_handlers)
app._run_repl()
app._run_repl() # pyright: ignore[reportPrivateUsage]
+6 -1
View File
@@ -1,6 +1,11 @@
from __future__ import annotations
__all__ = ["Response"]
from dishka import Container
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dishka import Container
from argenta.command import InputFlags
from argenta.response.status import ResponseStatus
+2 -3
View File
@@ -3,8 +3,6 @@ __all__ = ["Router"]
from inspect import get_annotations, getfullargspec, getsourcefile, getsourcelines
from typing import Callable
from rich.console import Console
from argenta.app.protocols import HandlerFunc
from argenta.command import Command, InputCommand, InputFlags
from argenta.command.flag import ValidationStatus
@@ -20,7 +18,7 @@ from argenta.router.exceptions import (RepeatedAliasNameException,
class Router:
def __init__(
self,
title: str = "Default title",
title: str = "Title",
*,
disable_redirect_stdout: bool = False,
):
@@ -175,6 +173,7 @@ class Router:
response_arg_annotation = func_annotations.get(response_arg)
if response_arg_annotation is not None and response_arg_annotation is not Response:
from rich.console import Console
source_line: int = getsourcelines(func)[1]
Console().print(
f'\nFile "{getsourcefile(func)}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint '