This commit is contained in:
2026-01-28 02:23:40 +03:00
parent 0567a3f4a3
commit 2a9281a421
9 changed files with 107 additions and 66 deletions
+3 -1
View File
@@ -6,6 +6,7 @@ from typing import Callable, Iterable
from prompt_toolkit import PromptSession, HTML
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer, Completion, CompleteEvent, ThreadedCompleter
from prompt_toolkit.cursor_shapes import CursorShape
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
from prompt_toolkit.history import History, ThreadedHistory, FileHistory, InMemoryHistory
@@ -136,5 +137,6 @@ class AutoCompleter:
if self._session is None:
raise RuntimeError("Call initial_setup() before using prompt()")
return self._session.prompt(
HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text
HTML(prompt_text) if isinstance(prompt_text, str) else prompt_text,
cursor=CursorShape.BLINKING_BEAM
)
+60 -50
View File
@@ -163,45 +163,23 @@ class BaseApp:
:param text: framed text
:return: None
"""
if isinstance(self._dividing_line, DynamicDividingLine):
clear_text = _ANSI_ESCAPE_RE.sub("", text)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 100
else 100
if max_length_line > 100
else 10
)
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
match self._dividing_line:
case (StaticDividingLine() as dividing_line) | (DynamicDividingLine() as dividing_line):
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
)
print(text.strip("\n"))
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
print(text.strip("\n"))
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
)
elif isinstance(self._dividing_line, StaticDividingLine):
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
)
print(text.strip("\n"))
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
)
elif self._dividing_line is None:
print('\n' + text.strip("\n") + '\n')
else:
raise NotImplementedError
case None:
print('\n' + text.strip("\n") + '\n')
case _:
raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented')
def _is_exit_command(self, command: InputCommand) -> bool:
"""
@@ -307,7 +285,8 @@ class BaseApp:
Private. Sets up default app view
:return: None
"""
self._prompt = f"\n<gray><b>{self._prompt}</b></gray>"
if isinstance(self._prompt, str):
self._prompt = f"\n<gray><b>{self._prompt}</b></gray>"
self._initial_message = (
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}"
)
@@ -354,15 +333,11 @@ class BaseApp:
"""
self._setup_system_router()
self._validate_routers_for_collisions()
self._autocompleter.initial_setup(self.registered_routers.get_triggers())
self._print_func(self._initial_message)
self._print_func('\n'.join(self._messages_on_startup))
for message in self._messages_on_startup:
self._print_func(message)
if self._messages_on_startup:
print("\n")
if not self._repeat_command_groups_printing:
self._print_command_group_description()
@@ -372,13 +347,48 @@ class BaseApp:
if not processing_router:
raise RuntimeError(f"Router for '{input_command.trigger}' not found. Panic!")
if processing_router.disable_redirect_stdout:
processing_router.finds_appropriate_handler(input_command)
else:
stdout_result = self._capture_stdout(
lambda: processing_router.finds_appropriate_handler(input_command)
)
self._print_framed_text(stdout_result)
match (self._dividing_line, processing_router.disable_redirect_stdout):
case (DynamicDividingLine(), False):
stdout_result = self._capture_stdout(
lambda: processing_router.finds_appropriate_handler(input_command)
)
clear_text = _ANSI_ESCAPE_RE.sub("", stdout_result)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 100
else 100
if max_length_line > 100
else 10
)
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
print(clear_text.strip("\n"))
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
case (StaticDividingLine() as dividing_line, bool()) | (DynamicDividingLine() as dividing_line, True):
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
case (None, bool()):
processing_router.finds_appropriate_handler(input_command)
case _:
raise NotImplementedError(f'Dividing line with type {self._dividing_line} is not implemented')
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine
+8 -1
View File
@@ -90,7 +90,14 @@ class InputCommand:
:param raw_command: raw input command
:return: model of the input command, after parsing as InputCommand
"""
tokens = shlex.split(raw_command)
lexer = shlex.shlex(raw_command, posix=True)
lexer.whitespace_split = True
lexer.commenters = ""
try:
tokens = list(lexer)
except ValueError as e:
raise UnprocessedInputFlagException from e
if not tokens:
raise EmptyInputCommandException