Merge branch 'main' into benchmarks

This commit is contained in:
2026-01-22 04:56:27 +03:00
9 changed files with 337 additions and 333 deletions
+120 -79
View File
@@ -1,97 +1,138 @@
__all__ = ["AutoCompleter"]
import os
import readline
from typing import Never
import sys
from typing import Callable, Iterable
from prompt_toolkit import PromptSession, HTML
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer, Completion, CompleteEvent
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from prompt_toolkit.history import History, ThreadedHistory, FileHistory, InMemoryHistory
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
class CommandLexer(Lexer):
def __init__(self, valid_commands: set[str]) -> None:
self.valid_commands: set[str] = valid_commands
def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]:
def get_line_tokens(lineno: int) -> StyleAndTextTuples:
if lineno >= len(document.lines):
return []
line_text: str = document.lines[lineno]
if not line_text.strip():
return [("", line_text)]
first_word: str = line_text.split()[0] if line_text.split() else ""
if first_word in self.valid_commands:
return [("class:valid", line_text)]
else:
return [("class:invalid", line_text)]
return get_line_tokens
class HistoryCompleter(Completer):
def __init__(self, history_container: History, static_commands: set[str]) -> None:
self.history_container: History = history_container
self.static_commands: set[str] = static_commands
def get_completions(self, document: Document, complete_event: CompleteEvent) -> Iterable[Completion]:
text: str = document.text_before_cursor
history_items: set[str] = set(self.history_container.load_history_strings())
all_candidates: set[str] = history_items.union(self.static_commands)
matches: list[str] = sorted(cmd for cmd in all_candidates if cmd.startswith(text))
if not matches:
return
for match in matches:
yield Completion(
match,
start_position=-len(text),
display=match
)
@staticmethod
def _find_common_prefix(matches: list[str]) -> str:
if not matches:
return ""
common: str = matches[0]
for match in matches[1:]:
i: int = 0
while i < len(common) and i < len(match) and common[i] == match[i]:
i += 1
common = common[:i]
return common
class AutoCompleter:
def __init__(
self, history_filename: str | None = None, autocomplete_button: str = "tab"
self,
history_filename: str | None = None,
autocomplete_button: str = "tab",
command_highlighting: bool = True,
auto_suggestions: bool = True,
) -> None:
"""
Public. Configures and implements auto-completion of input command
:param history_filename: the name of the file for saving the history of the autocompleter
:param autocomplete_button: the button for auto-completion
:return: None
"""
self.history_filename: str | None = history_filename
self.autocomplete_button: str = autocomplete_button
def _complete(self, text: str, state: int) -> str | None:
"""
Private. Auto-completion function
:param text: part of the command being entered
:param state: the current cursor position is relative to the beginning of the line
:return: the desired candidate as str or None
"""
matches: list[str] = sorted(
cmd for cmd in _get_history_items() if cmd.startswith(text)
)
if len(matches) > 1:
common_prefix = matches[0]
for match in matches[1:]:
i = 0
while (
i < len(common_prefix)
and i < len(match)
and common_prefix[i] == match[i]
):
i += 1
common_prefix = common_prefix[:i]
if state == 0:
readline.insert_text(common_prefix[len(text) :])
readline.redisplay()
return None
elif len(matches) == 1:
return matches[0] if state == 0 else None
else:
return None
self.command_highlighting: bool = command_highlighting
self.auto_suggestions: bool = auto_suggestions
self._session: PromptSession[str] | None = None
self._fallback_mode: bool = False
def initial_setup(self, all_commands: set[str]) -> None:
"""
Private. Initial setup function
:param all_commands: Registered commands for adding them to the autocomplete history
:return: None
"""
if self.history_filename:
if os.path.exists(self.history_filename):
readline.read_history_file(self.history_filename)
if not sys.stdin.isatty():
self._session = None
self._fallback_mode = True
return
kb = KeyBindings()
def _(event: KeyPressEvent) -> None:
buff = event.app.current_buffer
if buff.complete_state:
buff.complete_next()
else:
for line in all_commands:
readline.add_history(line)
completions = list(buff.completer.get_completions(buff.document, CompleteEvent()))
if len(completions) == 1:
buff.apply_completion(completions[0])
else:
buff.start_completion(select_first=False)
if not self.history_filename:
for line in all_commands:
readline.add_history(line)
kb.add(self.autocomplete_button)(_)
readline.set_completer(self._complete)
readline.set_completer_delims(readline.get_completer_delims().replace(" ", ""))
readline.parse_and_bind(f"{self.autocomplete_button}: complete")
history: InMemoryHistory | ThreadedHistory
def exit_setup(self, all_commands: set[str]) -> None:
"""
Private. Exit setup function
:return: None
"""
if self.history_filename:
readline.write_history_file(self.history_filename)
with open(self.history_filename, "r") as history_file:
raw_history = history_file.read()
pretty_history: list[str] = []
for line in set(raw_history.strip().split("\n")):
if line.split()[0] in all_commands:
pretty_history.append(line)
with open(self.history_filename, "w") as history_file:
_ = history_file.write("\n".join(pretty_history))
history = ThreadedHistory(FileHistory(self.history_filename))
else:
history = InMemoryHistory()
style = Style.from_dict({'valid': '#00ff00', 'invalid': '#ff0000'})
def _get_history_items() -> list[str] | list[Never]:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str] | list[Never]
"""
return [
readline.get_history_item(i)
for i in range(1, readline.get_current_history_length() + 1)
]
self._session = PromptSession(
history=history,
completer=HistoryCompleter(history, all_commands),
complete_while_typing=False,
key_bindings=kb,
auto_suggest=AutoSuggestFromHistory() if self.auto_suggestions else None,
style=style if self.command_highlighting else style,
lexer=CommandLexer(all_commands) if self.command_highlighting else None,
)
def prompt(self, prompt_text: str | HTML = ">>> ") -> str:
if self._fallback_mode:
return input(prompt_text if isinstance(prompt_text, str) else ">>> ")
if self._session is None:
raise RuntimeError("Call initial_setup() before using prompt()")
return self._session.prompt(
HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text
)
+6 -6
View File
@@ -6,6 +6,7 @@ from contextlib import redirect_stdout
from typing import Callable, Never, TypeAlias
from art import text2art
from prompt_toolkit import HTML
from rich.console import Console
from rich.markup import escape
@@ -37,7 +38,7 @@ class BaseApp:
def __init__(
self,
*,
prompt: str,
prompt: str | HTML,
initial_message: str,
farewell_message: str,
exit_command: Command,
@@ -48,7 +49,7 @@ class BaseApp:
autocompleter: AutoCompleter,
print_func: Printer,
) -> None:
self._prompt: str = prompt
self._prompt: str | HTML = prompt
self._print_func: Printer = print_func
self._exit_command: Command = exit_command
self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line
@@ -307,7 +308,7 @@ class BaseApp:
Private. Sets up default app view
:return: None
"""
self._prompt = f"[italic dim bold]{self._prompt}[/italic dim bold]"
self._prompt = f"<gray><b>{self._prompt}</b></gray>"
self._initial_message = (
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n"
)
@@ -404,7 +405,7 @@ class App(BaseApp):
def __init__(
self,
*,
prompt: str = "What do you want to do?\n\n",
prompt: str | HTML = ">>> ",
initial_message: str = "Argenta\n",
farewell_message: str = "\nSee you\n",
exit_command: Command = DEFAULT_EXIT_COMMAND,
@@ -455,7 +456,7 @@ class App(BaseApp):
if self._repeat_command_groups_printing:
self._print_command_group_description()
raw_command: str = Console().input(self._prompt)
raw_command: str = self._autocompleter.prompt(self._prompt)
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
@@ -468,7 +469,6 @@ class App(BaseApp):
if self._is_exit_command(input_command):
self.system_router.finds_appropriate_handler(input_command)
self._autocompleter.exit_setup(self.registered_routers.get_triggers())
return
if self._is_unknown_command(input_command):