This commit is contained in:
2026-02-01 00:26:25 +03:00
parent 5f6b3368e1
commit 24aa75eb37
11 changed files with 187 additions and 186 deletions
+2 -2
View File
@@ -1,6 +1,6 @@
from rich.markup import escape
from argenta import Response
from argenta.response.entity import Response
from argenta.app.presentation.renderers import Renderer
from argenta.app.protocols import (
NonStandardBehaviorHandler,
@@ -53,7 +53,7 @@ class BehaviorHandlersFabric:
return unknown_command_handler
def generate_exit_command_handler(self, farewell_message: str) -> NonStandardBehaviorHandler[Response]:
return lambda _: self._printer(self._renderer.render_farewell_message(farewell_message))
return lambda _: self._printer(farewell_message)
def generate_description_message_generator(self) -> DescriptionMessageGenerator:
return lambda command, description: self._renderer.render_text_for_description_message_generator(
+4 -4
View File
@@ -41,9 +41,9 @@ class StaticDividingLine(BaseDividingLine):
:return: full line of dividing line as str
"""
if is_override:
return f"\n{self.length * self.get_unit_part()}\n"
return self.length * self.get_unit_part()
else:
return f"\n[dim]{self.length * self.get_unit_part()}[/dim]\n"
return f"[dim]{self.length * self.get_unit_part()}[/dim]"
class DynamicDividingLine(BaseDividingLine):
@@ -63,6 +63,6 @@ class DynamicDividingLine(BaseDividingLine):
:return: full line of dividing line as str
"""
if is_override:
return f"\n{length * self.get_unit_part()}\n"
return length * self.get_unit_part()
else:
return f"\n[dim]{self.get_unit_part() * length}[/dim]\n"
return f"[dim]{self.get_unit_part() * length}[/dim]"
+28 -100
View File
@@ -1,9 +1,6 @@
__all__ = ["App"]
import io
import re
from contextlib import redirect_stdout
from typing import Callable, Never, TypeAlias
from typing import Never, TypeAlias
from rich.console import Console
@@ -31,7 +28,6 @@ from argenta.router import Router
Matches: TypeAlias = list[str] | list[Never]
_ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\u001b\[[0-9;]*m")
class BaseApp:
@@ -47,10 +43,10 @@ class BaseApp:
repeat_command_groups_printing: bool,
override_system_messages: bool,
autocompleter: AutoCompleter,
print_func: Printer,
printer: Printer,
) -> None:
self._prompt: str = prompt
self._print_func: Printer = print_func
self._printer: Printer = printer
self._exit_command: Command = exit_command
self._dividing_line: StaticDividingLine | DynamicDividingLine | None = dividing_line
self._repeat_command_groups_printing: bool = repeat_command_groups_printing
@@ -58,7 +54,6 @@ class BaseApp:
self._autocompleter: AutoCompleter = autocompleter
self._system_router: Router = Router(title=system_router_title)
self._stdout_buffer: io.StringIO = io.StringIO()
self.registered_routers: RegisteredRouters = RegisteredRouters()
self._messages_on_startup: list[str] = []
@@ -67,11 +62,16 @@ class BaseApp:
else:
self._renderer: Renderer = RichRenderer()
self._viewer: Viewer = Viewer(self._print_func, self._renderer)
self._viewer: Viewer = Viewer(
printer=self._printer,
renderer=self._renderer,
dividing_line=self._dividing_line,
override_system_messages=self._override_system_messages,
)
self._handlers_fabric: BehaviorHandlersFabric = BehaviorHandlersFabric(
self._print_func,
self._renderer,
self._most_similar_command
printer=self._printer,
renderer=self._renderer,
most_similar_command_getter=self._most_similar_command
)
self._initial_message: str = self._renderer.render_initial_message(initial_message)
@@ -131,34 +131,6 @@ class BaseApp:
"""
self._exit_command_handler = _
def _print_static_framed_text(self, text: str) -> None:
"""
Private. Outputs text by framing it in a static or dynamic split strip
:param text: framed text
:return: None
"""
match self._dividing_line:
case StaticDividingLine() as dividing_line:
self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages))
print(text.strip("\n"))
self._print_func(dividing_line.get_full_static_line(is_override=self._override_system_messages))
case DynamicDividingLine() as dividing_line:
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
print(text.strip("\n"))
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
case None:
print("\n" + text.strip("\n") + "\n")
case _:
raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented")
def _is_exit_command(self, command: InputCommand) -> bool:
"""
Private. Checks if the given command is an exit command
@@ -178,18 +150,6 @@ class BaseApp:
return True
return False
def _capture_stdout(self, func: Callable[[], None]) -> str:
"""
Private. Captures stdout from a function call using a reusable buffer
:param func: function to execute with captured stdout
:return: captured stdout as string
"""
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate(0)
with redirect_stdout(self._stdout_buffer):
func()
return self._stdout_buffer.getvalue()
def _error_handler(self, error: InputCommandException, raw_command: str) -> None:
"""
Private. Handles parsing errors of the entered command
@@ -276,46 +236,14 @@ class BaseApp:
if not processing_router:
raise RuntimeError(f"Router for '{input_command.trigger}' not found. Panic!")
match (self._dividing_line, processing_router.disable_redirect_stdout):
case (None, bool()):
processing_router.finds_appropriate_handler(input_command)
case (DynamicDividingLine(), False):
stdout_result = self._capture_stdout(lambda: processing_router.finds_appropriate_handler(input_command))
clear_text = _ANSI_ESCAPE_RE.sub("", stdout_result)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line if 10 <= max_length_line <= 100 else 100 if max_length_line > 100 else 10
)
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
print(clear_text.strip("\n"))
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
case (StaticDividingLine() as dividing_line, bool()) | (DynamicDividingLine() as dividing_line, True):
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
)
case _:
raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented")
self._viewer.view_framed_text_from_generator(
output_text_generator=lambda: processing_router.finds_appropriate_handler(input_command),
is_stdout_redirected_by_router=processing_router.is_redirect_stdout_disabled
)
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine
DEFAULT_PRINT_FUNC: Printer = Console().print
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine | None
DEFAULT_PRINTER: Printer = Console().print
DEFAULT_EXIT_COMMAND: Command = Command("q", description="Exit command")
@@ -328,11 +256,11 @@ class App(BaseApp):
farewell_message: str = "See you",
exit_command: Command = DEFAULT_EXIT_COMMAND,
system_router_title: str = "System points:",
dividing_line: AVAILABLE_DIVIDING_LINES | None = None,
dividing_line: AVAILABLE_DIVIDING_LINES = None,
repeat_command_groups_printing: bool = False,
override_system_messages: bool = False,
autocompleter: AutoCompleter | None = None,
print_func: Printer = DEFAULT_PRINT_FUNC,
printer: Printer = DEFAULT_PRINTER,
) -> None:
"""
Public. The essence of the application itself.
@@ -346,7 +274,7 @@ class App(BaseApp):
:param repeat_command_groups_printing: whether to repeat the available commands and their description
:param override_system_messages: whether to redefine the default formatting of system messages
:param autocompleter: the entity of the autocompleter
:param print_func: system messages text output function
:param printer: system messages text output function
:return: None
"""
super().__init__(
@@ -359,7 +287,7 @@ class App(BaseApp):
repeat_command_groups_printing=repeat_command_groups_printing,
override_system_messages=override_system_messages,
autocompleter=autocompleter or AutoCompleter(),
print_func=print_func,
printer=printer,
)
def run_polling(self) -> None:
@@ -367,7 +295,7 @@ class App(BaseApp):
Private. Starts the user input processing cycle
:return: None
"""
self._print_func(self._initial_message)
self._viewer.view_initial_message(self._initial_message)
self._pre_cycle_setup()
while True:
if self._repeat_command_groups_printing:
@@ -380,10 +308,9 @@ class App(BaseApp):
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except InputCommandException as error: # noqa F841
stderr_result = self._capture_stdout(
lambda: self._error_handler(error, raw_command) # noqa F821
self._viewer.view_framed_text_from_generator(
output_text_generator=lambda: self._error_handler(error, raw_command)
)
self._print_static_framed_text(stderr_result)
continue
if self._is_exit_command(input_command):
@@ -391,8 +318,9 @@ class App(BaseApp):
return
if self._is_unknown_command(input_command):
stdout_res = self._capture_stdout(lambda: self._unknown_command_handler(input_command))
self._print_static_framed_text(stdout_res)
self._viewer.view_framed_text_from_generator(
output_text_generator=lambda: self._unknown_command_handler(input_command)
)
continue
self._process_exist_and_valid_command(input_command)
+9 -9
View File
@@ -78,7 +78,7 @@ class RichRenderer(Renderer):
@staticmethod
def render_messages_on_startup(messages: Iterable[str]) -> str:
return "\n".join(messages)
return "\n" + "\n".join(messages)
@staticmethod
def render_command_groups_description(
@@ -87,10 +87,10 @@ class RichRenderer(Renderer):
) -> str:
command_groups_description = ""
for registered_router in registered_routers:
command_groups_description += "\n" + registered_router.title
command_groups_description += "\n\n" + registered_router.title
for command_handler in registered_router.command_handlers:
handled_command = command_handler.handled_command
command_groups_description += description_message_generator(
command_groups_description += '\n' + description_message_generator(
handled_command.trigger,
handled_command.description,
)
@@ -114,7 +114,7 @@ class RichRenderer(Renderer):
most_similar_command_trigger: str | None
) -> str:
return (
f"[red]Unknown command:[/red][blue]{command_trigger}[/blue]"
f"[red]Unknown command:[/red] [blue]{command_trigger}[/blue]"
+ (f"[red], most similar:[/red][blue]{most_similar_command_trigger}[/blue]"
if most_similar_command_trigger else "")
)
@@ -131,14 +131,14 @@ class PlainRenderer(Renderer):
@staticmethod
def render_farewell_message(text: str) -> str:
return f"{text} | [https://github.com/koloideal/Argenta](https://github.com/koloideal/Argenta) | made by kolo"
return f"\n{text} | https://github.com/koloideal/Argenta | made by kolo"
@staticmethod
def render_text_for_description_message_generator(command: str, description: str) -> str:
return f"{command} *=*=* {description}"
def render_messages_on_startup(self, messages: Iterable[str]) -> str:
return "\n".join(messages)
return "\n" + "\n".join(messages)
@staticmethod
def render_command_groups_description(
@@ -147,10 +147,10 @@ class PlainRenderer(Renderer):
) -> str:
command_groups_description = ""
for registered_router in registered_routers:
command_groups_description += "\n" + registered_router.title
command_groups_description += "\n\n" + registered_router.title
for command_handler in registered_router.command_handlers:
handled_command = command_handler.handled_command
command_groups_description += description_message_generator(
command_groups_description += "\n" + description_message_generator(
handled_command.trigger,
handled_command.description,
)
@@ -175,7 +175,7 @@ class PlainRenderer(Renderer):
) -> str:
return (
f"Unknown command: {command_trigger}"
+ (f", most similar:{most_similar_command_trigger}"
+ (f", most similar: {most_similar_command_trigger}"
if most_similar_command_trigger else "")
)
+71 -2
View File
@@ -1,14 +1,43 @@
from typing import Iterable
__all__ = ["Viewer"]
import re
from contextlib import redirect_stdout
from io import StringIO
from typing import Iterable, Callable, TypeAlias
from rich.text import Text
from argenta.app import StaticDividingLine, DynamicDividingLine
from argenta.app.presentation.renderers import Renderer
from argenta.app.protocols import Printer, DescriptionMessageGenerator
from argenta.app.registered_routers.entity import RegisteredRouters
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine | None
class Viewer:
def __init__(self, printer: Printer, renderer: Renderer):
ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\u001b\[[0-9;]*m")
def __init__(
self,
printer: Printer,
renderer: Renderer,
dividing_line: AVAILABLE_DIVIDING_LINES,
override_system_messages: bool
):
self._printer = printer
self._renderer = renderer
self._dividing_line = dividing_line
self._override_system_messages = override_system_messages
self._stdout_buffer: StringIO = StringIO()
def _capture_stdout(self, func: Callable[[], None]) -> str:
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate(0)
with redirect_stdout(self._stdout_buffer):
func()
return self._stdout_buffer.getvalue()
def view_messages_on_startup(self, messages: Iterable[str]) -> None:
self._printer(self._renderer.render_messages_on_startup(messages))
@@ -25,3 +54,43 @@ class Viewer:
)
)
def view_initial_message(self, initial_message: str) -> None:
self._printer(initial_message)
def view_framed_text_from_generator(
self,
output_text_generator: Callable[[], None],
is_stdout_redirected_by_router: bool = False,
) -> None:
match (self._dividing_line, is_stdout_redirected_by_router):
case (None, bool()):
output_text_generator()
case (DynamicDividingLine(), False):
stdout_result = self._capture_stdout(
lambda: output_text_generator()
)
clear_text = self.ANSI_ESCAPE_RE.sub("", stdout_result)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 100
else 100
if max_length_line > 100
else 10
)
dividing_line_as_str: str = self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
self._printer(dividing_line_as_str + "\n")
self._printer(Text.from_ansi(stdout_result.strip("\n")).markup)
self._printer('\n' + dividing_line_as_str)
case (StaticDividingLine() as dividing_line, bool()) | (DynamicDividingLine() as dividing_line, True):
dividing_line_as_str: str = StaticDividingLine(dividing_line.get_unit_part()).get_full_static_line(
is_override=self._override_system_messages
)
self._printer(dividing_line_as_str + '\n')
output_text_generator()
self._printer('\n' + dividing_line_as_str)
case _:
raise NotImplementedError(f"Dividing line with type {self._dividing_line} is not implemented")