This commit is contained in:
2026-01-15 14:52:41 +03:00
parent 3cd74fc186
commit 18f62d3e7c
5 changed files with 51 additions and 31 deletions
+27 -11
View File
@@ -3,7 +3,7 @@ __all__ = ["App"]
import io
import re
from contextlib import redirect_stdout
from typing import Never, TypeAlias
from typing import Callable, Never, TypeAlias
from art import text2art
from rich.console import Console
@@ -30,6 +30,8 @@ from argenta.router import Router
Matches: TypeAlias = list[str] | list[Never]
_ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\u001b\[[0-9;]*m")
class BaseApp:
def __init__(
@@ -58,6 +60,8 @@ class BaseApp:
self._farewell_message: str = farewell_message
self._initial_message: str = initial_message
self._stdout_buffer: io.StringIO = io.StringIO()
self._description_message_gen: DescriptionMessageGenerator = (
lambda command, description: f"{command} *=*=* {description}"
)
@@ -160,7 +164,7 @@ class BaseApp:
:return: None
"""
if isinstance(self._dividing_line, DynamicDividingLine):
clear_text = re.sub(r"\u001b\[[0-9;]*m", "", text)
clear_text = _ANSI_ESCAPE_RE.sub("", text)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
@@ -217,6 +221,18 @@ class BaseApp:
return True
return False
def _capture_stdout(self, func: Callable[[], None]) -> str:
"""
Private. Captures stdout from a function call using a reusable buffer
:param func: function to execute with captured stdout
:return: captured stdout as string
"""
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate(0)
with redirect_stdout(self._stdout_buffer):
func()
return self._stdout_buffer.getvalue()
def _error_handler(self, error: InputCommandException, raw_command: str) -> None:
"""
Private. Handles parsing errors of the entered command
@@ -368,9 +384,9 @@ class BaseApp:
)
)
else:
with redirect_stdout(io.StringIO()) as stdout:
processing_router.finds_appropriate_handler(input_command)
stdout_result: str = stdout.getvalue()
stdout_result = self._capture_stdout(
lambda: processing_router.finds_appropriate_handler(input_command)
)
self._print_framed_text(stdout_result)
@@ -442,9 +458,9 @@ class App(BaseApp):
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except InputCommandException as error:
with redirect_stdout(io.StringIO()) as stderr:
self._error_handler(error, raw_command)
stderr_result: str = stderr.getvalue()
stderr_result = self._capture_stdout(
lambda: self._error_handler(error, raw_command)
)
self._print_framed_text(stderr_result)
continue
@@ -454,9 +470,9 @@ class App(BaseApp):
return
if self._is_unknown_command(input_command):
with redirect_stdout(io.StringIO()) as stdout:
self._unknown_command_handler(input_command)
stdout_res: str = stdout.getvalue()
stdout_res = self._capture_stdout(
lambda: self._unknown_command_handler(input_command)
)
self._print_framed_text(stdout_res)
continue
+10 -3
View File
@@ -1,8 +1,10 @@
__all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "DescriptionMessageGenerator"]
__all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "DescriptionMessageGenerator", "HandlerFunc"]
from typing import Protocol, TypeVar
from typing import ParamSpec, Protocol, TypeVar
from argenta.response import Response
T = TypeVar("T", contravariant=True) # noqa: WPS111
T = TypeVar("T", contravariant=True)
P = ParamSpec("P")
class NonStandardBehaviorHandler(Protocol[T]):
@@ -23,3 +25,8 @@ class Printer(Protocol):
class DescriptionMessageGenerator(Protocol):
def __call__(self, _command: str, _description: str, /) -> str:
raise NotImplementedError
class HandlerFunc(Protocol):
def __call__(self, response: Response) -> None:
raise NotImplementedError
+2 -2
View File
@@ -14,7 +14,7 @@ class Orchestrator:
def __init__(
self,
arg_parser: ArgParser = DEFAULT_ARGPARSER,
custom_providers: list[Provider] = [],
custom_providers: list[Provider] | None = None,
auto_inject_handlers: bool = True,
):
"""
@@ -23,7 +23,7 @@ class Orchestrator:
:return: None
"""
self._arg_parser: ArgParser = arg_parser
self._custom_providers: list[Provider] = custom_providers
self._custom_providers: list[Provider] = custom_providers or []
self._auto_inject_handlers: bool = auto_inject_handlers
self._arg_parser._parse_args() # pyright: ignore[reportPrivateUsage]
+2 -3
View File
@@ -1,13 +1,12 @@
__all__ = ["CommandHandler", "CommandHandlers"]
from collections.abc import Iterator
from typing import Callable, Never
from typing import Never
from argenta.app.protocols import HandlerFunc
from argenta.command import Command
from argenta.response import Response
HandlerFunc = Callable[..., None]
class CommandHandler:
def __init__(self, handler_as_func: HandlerFunc, handled_command: Command):
+3 -5
View File
@@ -1,10 +1,11 @@
__all__ = ["Router"]
from inspect import get_annotations, getfullargspec, getsourcefile, getsourcelines
from typing import Callable, TypeAlias
from typing import Callable
from rich.console import Console
from argenta.app.protocols import HandlerFunc
from argenta.command import Command, InputCommand
from argenta.command.flag import ValidationStatus
from argenta.command.flag.flags import InputFlags
@@ -16,8 +17,6 @@ from argenta.router.exceptions import (RepeatedAliasNameException,
RequiredArgumentNotPassedException,
TriggerContainSpacesException)
HandlerFunc: TypeAlias = Callable[..., None]
class Router:
def __init__(
@@ -176,8 +175,7 @@ def _validate_func_args(func: HandlerFunc) -> None:
response_arg_annotation = func_annotations.get(response_arg)
if response_arg_annotation is not None:
if response_arg_annotation is not Response:
if response_arg_annotation is not None and response_arg_annotation is not Response:
source_line: int = getsourcelines(func)[1]
Console().print(
f'\nFile "{getsourcefile(func)}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint '