feat: impl docs (#4)

The entire public api is covered with documentation in two languages - Russian and English.

the library now supports the latest three versions of python - 3.12, 3.13 and 3.14

minor design changes: now, when a Boolean flag is entered, its value is an empty string, not None.

tests have been adapted to the supported versions of python, readmi has been redesigned in two languages, German is no longer available.
This commit is contained in:
kolo
2025-12-04 21:55:19 +03:00
committed by GitHub
parent a2ac6a608f
commit ce7e24b924
210 changed files with 13770 additions and 1183 deletions
+6 -6
View File
@@ -1,6 +1,6 @@
__all__ = ["App", "Orchestrator", "Router"]
from argenta.orchestrator.entity import Orchestrator
from argenta.app.models import App
from argenta.router.entity import Router
from argenta.app.models import App as App
from argenta.command.models import Command as Command
from argenta.data_bridge.entity import DataBridge as DataBridge
from argenta.orchestrator.entity import Orchestrator as Orchestrator
from argenta.response.entity import Response as Response
from argenta.router.entity import Router as Router
+5 -12
View File
@@ -1,12 +1,5 @@
__all__ = [
"App",
"PredefinedMessages",
"DynamicDividingLine",
"StaticDividingLine",
"AutoCompleter"
]
from argenta.app.models import App
from argenta.app.defaults import PredefinedMessages
from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine
from argenta.app.autocompleter.entity import AutoCompleter
from argenta.app.autocompleter.entity import AutoCompleter as AutoCompleter
from argenta.app.defaults import PredefinedMessages as PredefinedMessages
from argenta.app.dividing_line.models import DynamicDividingLine as DynamicDividingLine
from argenta.app.dividing_line.models import StaticDividingLine as StaticDividingLine
from argenta.app.models import App as App
+1 -4
View File
@@ -1,4 +1 @@
__all__ = ["AutoCompleter"]
from argenta.app.autocompleter.entity import AutoCompleter
from argenta.app.autocompleter.entity import AutoCompleter as AutoCompleter
+23 -21
View File
@@ -1,12 +1,12 @@
__all__ = ["AutoCompleter"]
import os
import readline
from typing import Never
class AutoCompleter:
def __init__(
self, history_filename: str | None = None, autocomplete_button: str = "tab"
) -> None:
def __init__(self, history_filename: str | None = None, autocomplete_button: str = "tab") -> None:
"""
Public. Configures and implements auto-completion of input command
:param history_filename: the name of the file for saving the history of the autocompleter
@@ -23,22 +23,16 @@ class AutoCompleter:
:param state: the current cursor position is relative to the beginning of the line
:return: the desired candidate as str or None
"""
matches: list[str] = sorted(
cmd for cmd in _get_history_items() if cmd.startswith(text)
)
matches: list[str] = sorted(cmd for cmd in _get_history_items() if cmd.startswith(text))
if len(matches) > 1:
common_prefix = matches[0]
for match in matches[1:]:
i = 0
while (
i < len(common_prefix)
and i < len(match)
and common_prefix[i] == match[i]
):
while i < len(common_prefix) and i < len(match) and common_prefix[i] == match[i]:
i += 1
common_prefix = common_prefix[:i]
if state == 0:
readline.insert_text(common_prefix[len(text) :])
readline.insert_text(common_prefix[len(text) :])
readline.redisplay()
return None
elif len(matches) == 1:
@@ -54,37 +48,45 @@ class AutoCompleter:
"""
if self.history_filename:
if os.path.exists(self.history_filename):
readline.read_history_file(self.history_filename)
readline.read_history_file(self.history_filename)
else:
for line in all_commands:
readline.add_history(line)
readline.add_history(line)
if not self.history_filename:
for line in all_commands:
readline.add_history(line)
readline.set_completer(self._complete)
readline.set_completer_delims(readline.get_completer_delims().replace(" ", ""))
readline.parse_and_bind(f"{self.autocomplete_button}: complete")
def exit_setup(self, all_commands: list[str]) -> None:
def exit_setup(self, all_commands: list[str], ignore_command_register: bool) -> None:
"""
Private. Exit setup function
:return: None
"""
if self.history_filename:
readline.write_history_file(self.history_filename)
readline.write_history_file(self.history_filename)
with open(self.history_filename, "r") as history_file:
raw_history = history_file.read()
pretty_history: list[str] = []
for line in set(raw_history.strip().split("\n")):
if line.split()[0] in all_commands:
if _is_command_exist(line.split()[0], all_commands, ignore_command_register):
pretty_history.append(line)
with open(self.history_filename, "w") as history_file:
_ = history_file.write("\n".join(pretty_history))
def _is_command_exist(command: str, existing_commands: list[str], ignore_command_register: bool) -> bool:
if ignore_command_register:
return command.lower() in existing_commands
return command in existing_commands
def _get_history_items() -> list[str] | list[Never]:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str] | list[Never]
"""
return [
readline.get_history_item(i)
for i in range(1, readline.get_current_history_length() + 1)
]
return [readline.get_history_item(i) for i in range(1, readline.get_current_history_length() + 1)]
+3
View File
@@ -1,3 +1,5 @@
__all__ = ["PredefinedMessages"]
from enum import StrEnum
@@ -5,6 +7,7 @@ class PredefinedMessages(StrEnum):
"""
Public. A dataclass with predetermined messages for quick use
"""
USAGE = "[b dim]Usage[/b dim]: [i]<command> <[green]flags[/green]>[/i]"
HELP = "[b dim]Help[/b dim]: [i]<command>[/i] [b red]--help[/b red]"
AUTOCOMPLETE = "[b dim]Autocomplete[/b dim]: [i]<part>[/i] [bold]<tab>"
+2 -4
View File
@@ -1,4 +1,2 @@
__all__ = ["StaticDividingLine", "DynamicDividingLine"]
from argenta.app.dividing_line.models import StaticDividingLine, DynamicDividingLine
from argenta.app.dividing_line.models import DynamicDividingLine as DynamicDividingLine
from argenta.app.dividing_line.models import StaticDividingLine as StaticDividingLine
+2
View File
@@ -1,3 +1,5 @@
__all__ = ["StaticDividingLine", "DynamicDividingLine"]
from abc import ABC
+48 -103
View File
@@ -1,9 +1,11 @@
__all__ = ["App"]
import io
import re
from contextlib import redirect_stdout
from typing import Never, TypeAlias
from art import text2art # pyright: ignore[reportMissingTypeStubs, reportUnknownVariableType]
from art import text2art
from rich.console import Console
from rich.markup import escape
@@ -41,7 +43,7 @@ class BaseApp:
system_router_title: str | None,
ignore_command_register: bool,
dividing_line: StaticDividingLine | DynamicDividingLine,
repeat_command_groups: bool,
repeat_command_groups_printing: bool,
override_system_messages: bool,
autocompleter: AutoCompleter,
print_func: Printer,
@@ -52,7 +54,7 @@ class BaseApp:
self._system_router_title: str | None = system_router_title
self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line
self._ignore_command_register: bool = ignore_command_register
self._repeat_command_groups_description: bool = repeat_command_groups
self._repeat_command_groups_printing_description: bool = repeat_command_groups_printing
self._override_system_messages: bool = override_system_messages
self._autocompleter: AutoCompleter = autocompleter
@@ -74,25 +76,21 @@ class BaseApp:
else self._matching_default_triggers_with_routers
)
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = (
lambda _: print_func(f"Incorrect flag syntax: {_}")
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(
f"Incorrect flag syntax: {_}"
)
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = (
lambda _: print_func(f"Repeated input flags: {_}")
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(
f"Repeated input flags: {_}"
)
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func(
"Empty input command"
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func("Empty input command")
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = lambda _: print_func(
f"Unknown command: {_.trigger}"
)
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = (
lambda _: print_func(f"Unknown command: {_.trigger}")
)
self._exit_command_handler: NonStandardBehaviorHandler[Response] = (
lambda _: print_func(self._farewell_message)
self._exit_command_handler: NonStandardBehaviorHandler[Response] = lambda _: print_func(
self._farewell_message
)
def set_description_message_pattern(
self, _: DescriptionMessageGenerator, /
) -> None:
def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None:
"""
Public. Sets the output pattern of the available commands
:param _: output pattern of the available commands
@@ -100,9 +98,7 @@ class BaseApp:
"""
self._description_message_gen = _
def set_incorrect_input_syntax_handler(
self, _: NonStandardBehaviorHandler[str], /
) -> None:
def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for incorrect flags when entering a command
:param _: handler for incorrect flags when entering a command
@@ -110,9 +106,7 @@ class BaseApp:
"""
self._incorrect_input_syntax_handler = _
def set_repeated_input_flags_handler(
self, _: NonStandardBehaviorHandler[str], /
) -> None:
def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for repeated flags when entering a command
:param _: handler for repeated flags when entering a command
@@ -120,9 +114,7 @@ class BaseApp:
"""
self._repeated_input_flags_handler = _
def set_unknown_command_handler(
self, _: NonStandardBehaviorHandler[InputCommand], /
) -> None:
def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None:
"""
Public. Sets the handler for unknown commands when entering a command
:param _: handler for unknown commands when entering a command
@@ -138,9 +130,7 @@ class BaseApp:
"""
self._empty_input_command_handler = _
def set_exit_command_handler(
self, _: NonStandardBehaviorHandler[Response], /
) -> None:
def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None:
"""
Public. Sets the handler for exit command when entering a command
:param _: handler for exit command when entering a command
@@ -176,11 +166,7 @@ class BaseApp:
clear_text = re.sub(r"\u001b\[[0-9;]*m", "", text)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 80
else 80
if max_length_line > 80
else 10
max_length_line if 10 <= max_length_line <= 80 else 80 if max_length_line > 80 else 10
)
self._print_func(
@@ -197,15 +183,11 @@ class BaseApp:
elif isinstance(self._dividing_line, StaticDividingLine): # pyright: ignore[reportUnnecessaryIsInstance]
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
)
print(text.strip("\n"))
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
)
else:
@@ -239,14 +221,10 @@ class BaseApp:
"""
input_command_trigger = command.trigger
if self._ignore_command_register:
if input_command_trigger.lower() in list(
self._current_matching_triggers_with_routers.keys()
):
if input_command_trigger.lower() in list(self._current_matching_triggers_with_routers.keys()):
return False
else:
if input_command_trigger in list(
self._current_matching_triggers_with_routers.keys()
):
if input_command_trigger in list(self._current_matching_triggers_with_routers.keys()):
return False
return True
@@ -304,9 +282,7 @@ class BaseApp:
:return: None
"""
self._prompt = f"[italic dim bold]{self._prompt}"
self._initial_message = (
"\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n"
)
self._initial_message = "\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n"
self._farewell_message = (
"[bold red]\n\n"
+ str(text2art(self._farewell_message, font="chanky")) # pyright: ignore[reportUnknownArgumentType]
@@ -324,20 +300,14 @@ class BaseApp:
self._repeated_input_flags_handler = lambda raw_command: self._print_func(
f"[red bold]Repeated input flags: {escape(raw_command)}"
)
self._empty_input_command_handler = lambda: self._print_func(
"[red bold]Empty input command"
)
self._empty_input_command_handler = lambda: self._print_func("[red bold]Empty input command")
def unknown_command_handler(command: InputCommand) -> None:
cmd_trg: str = command.trigger
mst_sim_cmd: str | None = self._most_similar_command(cmd_trg)
first_part_of_text = (
f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
)
first_part_of_text = f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
second_part_of_text = (
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]"))
if mst_sim_cmd
else ""
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]")) if mst_sim_cmd else ""
)
self._print_func(first_part_of_text + second_part_of_text)
@@ -357,13 +327,9 @@ class BaseApp:
for trigger in combined:
self._matching_default_triggers_with_routers[trigger] = router_entity
self._matching_lower_triggers_with_routers[trigger.lower()] = (
router_entity
)
self._matching_lower_triggers_with_routers[trigger.lower()] = router_entity
self._autocompleter.initial_setup(
list(self._current_matching_triggers_with_routers.keys())
)
self._autocompleter.initial_setup(list(self._current_matching_triggers_with_routers.keys()))
seen = {}
for item in list(self._current_matching_triggers_with_routers.keys()):
@@ -383,7 +349,7 @@ class BaseApp:
self._print_func(message)
if self._messages_on_startup:
print("\n")
if not self._repeat_command_groups_description:
if not self._repeat_command_groups_printing_description:
self._print_command_group_description()
@@ -406,7 +372,7 @@ class App(BaseApp):
system_router_title: str | None = "System points:",
ignore_command_register: bool = True,
dividing_line: AVAILABLE_DIVIDING_LINES = DEFAULT_DIVIDING_LINE,
repeat_command_groups: bool = True,
repeat_command_groups_printing: bool = True,
override_system_messages: bool = False,
autocompleter: AutoCompleter = DEFAULT_AUTOCOMPLETER,
print_func: Printer = DEFAULT_PRINT_FUNC,
@@ -421,7 +387,7 @@ class App(BaseApp):
:param system_router_title: system router title
:param ignore_command_register: whether to ignore the case of the entered commands
:param dividing_line: the entity of the dividing line
:param repeat_command_groups: whether to repeat the available commands and their description
:param repeat_command_groups_printing: whether to repeat the available commands and their description
:param override_system_messages: whether to redefine the default formatting of system messages
:param autocompleter: the entity of the autocompleter
:param print_func: system messages text output function
@@ -435,7 +401,7 @@ class App(BaseApp):
system_router_title=system_router_title,
ignore_command_register=ignore_command_register,
dividing_line=dividing_line,
repeat_command_groups=repeat_command_groups,
repeat_command_groups_printing=repeat_command_groups_printing,
override_system_messages=override_system_messages,
autocompleter=autocompleter,
print_func=print_func,
@@ -448,15 +414,13 @@ class App(BaseApp):
"""
self._pre_cycle_setup()
while True:
if self._repeat_command_groups_description:
if self._repeat_command_groups_printing_description:
self._print_command_group_description()
raw_command: str = Console().input(self._prompt)
try:
input_command: InputCommand = InputCommand.parse(
raw_command=raw_command
)
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except InputCommandException as error:
with redirect_stdout(io.StringIO()) as stderr:
self._error_handler(error, raw_command)
@@ -467,7 +431,7 @@ class App(BaseApp):
if self._is_exit_command(input_command):
system_router.finds_appropriate_handler(input_command)
self._autocompleter.exit_setup(
list(self._current_matching_triggers_with_routers.keys())
list(self._current_matching_triggers_with_routers.keys()), self._ignore_command_register
)
return
@@ -478,40 +442,21 @@ class App(BaseApp):
self._print_framed_text(stdout_res)
continue
processing_router = self._current_matching_triggers_with_routers[
input_command.trigger.lower()
]
processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()]
if processing_router.disable_redirect_stdout:
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
self._dividing_line.get_full_static_line(
is_override=self._override_system_messages
)
)
else:
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
self._print_func(
StaticDividingLine(
dividing_line_unit_part
).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(
dividing_line_unit_part
).get_full_static_line(
is_override=self._override_system_messages
)
)
processing_router.finds_appropriate_handler(input_command)
self._print_func(
StaticDividingLine(dividing_line_unit_part).get_full_static_line(
is_override=self._override_system_messages
)
)
else:
with redirect_stdout(io.StringIO()) as stdout:
processing_router.finds_appropriate_handler(input_command)
+6 -3
View File
@@ -1,16 +1,19 @@
__all__ = ["NonStandardBehaviorHandler", "EmptyCommandHandler", "Printer", "DescriptionMessageGenerator"]
from typing import Protocol, TypeVar
T = TypeVar('T', contravariant=True) # noqa: WPS111
T = TypeVar("T", contravariant=True) # noqa: WPS111
class NonStandardBehaviorHandler(Protocol[T]):
def __call__(self, __param: T) -> None:
raise NotImplementedError
class EmptyCommandHandler(Protocol):
def __call__(self) -> None:
raise NotImplementedError
class Printer(Protocol):
def __call__(self, __text: str) -> None:
@@ -1,3 +1,5 @@
__all__ = ["RegisteredRouters"]
from typing import Iterator, Optional
from argenta.router import Router
+8 -12
View File
@@ -1,12 +1,8 @@
__all__ = [
"Command",
"PossibleValues",
"PredefinedFlags",
"InputCommand",
"Flags",
"Flag"
]
from argenta.command.models import Command, InputCommand
from argenta.command.flag import defaults as PredefinedFlags
from argenta.command.flag import (Flag, Flags, PossibleValues)
from argenta.command.flag import Flag as Flag
from argenta.command.flag import Flags as Flags
from argenta.command.flag import InputFlag as InputFlag
from argenta.command.flag import InputFlags as InputFlags
from argenta.command.flag import PossibleValues as PossibleValues
from argenta.command.flag.defaults import PredefinedFlags as PredefinedFlags
from argenta.command.models import Command as Command
from argenta.command.models import InputCommand as InputCommand
+13 -5
View File
@@ -1,12 +1,21 @@
from argenta.command.flag.models import Flag, InputFlag
__all__ = [
"InputCommandException",
"UnprocessedInputFlagException",
"RepeatedInputFlagsException",
"EmptyInputCommandException",
]
from abc import ABC, abstractmethod
from typing import override
from argenta.command.flag.models import Flag, InputFlag
class InputCommandException(ABC, Exception):
"""
Private. Base exception class for all exceptions raised when parse input command
"""
@override
@abstractmethod
def __str__(self) -> str:
@@ -17,6 +26,7 @@ class UnprocessedInputFlagException(InputCommandException):
"""
Private. Raised when an unprocessed input flag is detected
"""
@override
def __str__(self) -> str:
return "Unprocessed Input Flags"
@@ -34,16 +44,14 @@ class RepeatedInputFlagsException(InputCommandException):
@override
def __str__(self) -> str:
string_entity: str = self.flag.string_entity
return (
"Repeated Input Flags\n"
f"Duplicate flag was detected in the input: '{string_entity}'"
)
return f"Repeated Input Flags\nDuplicate flag was detected in the input: '{string_entity}'"
class EmptyInputCommandException(InputCommandException):
"""
Private. Raised when an empty input command is detected
"""
@override
def __str__(self) -> str:
return "Input Command is empty"
+6 -11
View File
@@ -1,11 +1,6 @@
__all__ = [
"Flag",
"InputFlag",
"Flags",
"PossibleValues",
"ValidationStatus"
]
from argenta.command.flag.models import Flag, InputFlag, PossibleValues, ValidationStatus
from argenta.command.flag.flags.models import Flags
from argenta.command.flag.flags.models import Flags as Flags
from argenta.command.flag.flags.models import InputFlags as InputFlags
from argenta.command.flag.models import Flag as Flag
from argenta.command.flag.models import InputFlag as InputFlag
from argenta.command.flag.models import PossibleValues as PossibleValues
from argenta.command.flag.models import ValidationStatus as ValidationStatus
+21 -19
View File
@@ -1,27 +1,29 @@
from typing import Literal
from argenta.command.flag.models import Flag, PossibleValues
import re
__all__ = ["PredefinedFlags"]
import re
from typing import Literal
from argenta.command.flag.models import Flag, PossibleValues
DEFAULT_PREFIX: Literal["-", "--", "---"] = "-"
HELP = Flag(name="help", possible_values=PossibleValues.NEITHER)
SHORT_HELP = Flag(name="H", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
INFO = Flag(name="info", possible_values=PossibleValues.NEITHER) # noqa: WPS110
SHORT_INFO = Flag(name="I", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
class PredefinedFlags:
HELP = Flag(name="help", possible_values=PossibleValues.NEITHER)
SHORT_HELP = Flag(name="H", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
ALL = Flag(name="all", possible_values=PossibleValues.NEITHER)
SHORT_ALL = Flag(name="A", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
INFO = Flag(name="info", possible_values=PossibleValues.NEITHER) # noqa: WPS110
SHORT_INFO = Flag(name="I", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
HOST = Flag(
name="host", possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
)
SHORT_HOST = Flag(
name="H",
prefix=DEFAULT_PREFIX,
possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"),
)
ALL = Flag(name="all", possible_values=PossibleValues.NEITHER)
SHORT_ALL = Flag(name="A", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
PORT = Flag(name="port", possible_values=re.compile(r"^\d{1,5}$"))
SHORT_PORT = Flag(name="P", prefix=DEFAULT_PREFIX, possible_values=re.compile(r"^\d{1,5}$"))
HOST = Flag(name="host", possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"))
SHORT_HOST = Flag(
name="H",
prefix=DEFAULT_PREFIX,
possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"),
)
PORT = Flag(name="port", possible_values=re.compile(r"^\d{1,5}$"))
SHORT_PORT = Flag(name="P", prefix=DEFAULT_PREFIX, possible_values=re.compile(r"^\d{1,5}$"))
+2 -10
View File
@@ -1,10 +1,2 @@
__all__ = [
"Flags",
"InputFlags"
]
from argenta.command.flag.flags.models import (
Flags,
InputFlags
)
from argenta.command.flag.flags.models import Flags as Flags
from argenta.command.flag.flags.models import InputFlags as InputFlags
+11 -7
View File
@@ -1,7 +1,9 @@
from argenta.command.flag.models import InputFlag, Flag
from typing import Generic, TypeVar, override
from collections.abc import Iterator
__all__ = ["Flags", "InputFlags"]
from collections.abc import Iterator
from typing import Generic, TypeVar, override
from argenta.command.flag.models import Flag, InputFlag
FlagType = TypeVar("FlagType")
@@ -30,6 +32,9 @@ class BaseFlags(Generic[FlagType]):
:return: None
"""
self.flags.extend(flags)
def __len__(self) -> int:
return len(self.flags)
def __iter__(self) -> Iterator[FlagType]:
return iter(self.flags)
@@ -52,7 +57,7 @@ class Flags(BaseFlags[Flag]):
:return: entity of the flag or None
"""
return next((flag for flag in self.flags if flag.name == name), None)
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, Flags):
@@ -82,9 +87,9 @@ class InputFlags(BaseFlags[InputFlag]):
:return: entity of the flag or None
"""
return next((flag for flag in self.flags if flag.name == name), None)
@override
def __eq__(self, other: object) -> bool:
def __eq__(self, other: object) -> bool:
if not isinstance(other, InputFlags):
raise NotImplementedError
@@ -103,4 +108,3 @@ class InputFlags(BaseFlags[InputFlag]):
return False
else:
raise TypeError
+44 -34
View File
@@ -1,23 +1,30 @@
__all__ = ["PossibleValues", "ValidationStatus", "Flag", "InputFlag"]
from enum import Enum
from re import Pattern
from typing import Literal, override
PREFIX_TYPE = Literal["-", "--", "---"]
class PossibleValues(Enum):
NEITHER = 'NEITHER'
ALL = 'ALL'
NEITHER = "NEITHER"
ALL = "ALL"
class ValidationStatus(Enum):
VALID = 'VALID'
INVALID = 'INVALID'
UNDEFINED = 'UNDEFINED'
VALID = "VALID"
INVALID = "INVALID"
UNDEFINED = "UNDEFINED"
class Flag:
def __init__(
self, name: str, *,
prefix: Literal["-", "--", "---"] = "--",
self,
name: str,
*,
prefix: PREFIX_TYPE = "--",
possible_values: list[str] | Pattern[str] | PossibleValues = PossibleValues.ALL,
) -> None:
"""
@@ -28,26 +35,29 @@ class Flag:
:return: None
"""
self.name: str = name
self.prefix: Literal["-", "--", "---"] = prefix
self.prefix: PREFIX_TYPE = prefix
self.possible_values: list[str] | Pattern[str] | PossibleValues = possible_values
def validate_input_flag_value(self, input_flag_value: str | None) -> bool:
def validate_input_flag_value(self, input_flag_value: str) -> bool:
"""
Private. Validates the input flag value
:param input_flag_value: The input flag value to validate
:return: whether the entered flag is valid as bool
"""
if self.possible_values == PossibleValues.NEITHER:
return input_flag_value is None
return input_flag_value == ''
if self.possible_values == PossibleValues.ALL:
return input_flag_value != ''
if isinstance(self.possible_values, Pattern):
return isinstance(input_flag_value, str) and bool(self.possible_values.match(input_flag_value))
return bool(self.possible_values.match(input_flag_value))
if isinstance(self.possible_values, list):
return input_flag_value in self.possible_values
return False
return True
@property
def string_entity(self) -> str:
"""
@@ -56,17 +66,17 @@ class Flag:
"""
string_entity: str = self.prefix + self.name
return string_entity
@override
def __str__(self) -> str:
return self.string_entity
@override
def __repr__(self) -> str:
return f'Flag<name={self.name}, prefix={self.prefix}>'
return f"Flag<name={self.name}, prefix={self.prefix}>"
@override
def __eq__(self, other: object) -> bool:
def __eq__(self, other: object) -> bool:
if isinstance(other, Flag):
return self.string_entity == other.string_entity
else:
@@ -75,10 +85,12 @@ class Flag:
class InputFlag:
def __init__(
self, name: str, *,
prefix: Literal['-', '--', '---'] = '--',
input_value: str | None,
status: ValidationStatus | None
self,
name: str,
*,
prefix: PREFIX_TYPE = "--",
input_value: str,
status: ValidationStatus | None,
):
"""
Public. The entity of the flag of the entered command
@@ -88,10 +100,10 @@ class InputFlag:
:return: None
"""
self.name: str = name
self.prefix: Literal['-', '--', '---'] = prefix
self.input_value: str | None = input_value
self.prefix: PREFIX_TYPE = prefix
self.input_value: str = input_value
self.status: ValidationStatus | None = status
@property
def string_entity(self) -> str:
"""
@@ -103,17 +115,15 @@ class InputFlag:
@override
def __str__(self) -> str:
return f'{self.string_entity} {self.input_value}'
@override
def __repr__(self) -> str:
return f'InputFlag<name={self.name}, prefix={self.prefix}, value={self.input_value}, status={self.status}>'
return f"{self.string_entity} {self.input_value}"
@override
def __eq__(self, other: object) -> bool:
def __repr__(self) -> str:
return f"InputFlag<name={self.name}, prefix={self.prefix}, value={self.input_value}, status={self.status}>"
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, InputFlag):
return (
self.name == other.name
)
return self.name == other.name
else:
raise NotImplementedError
+63 -85
View File
@@ -1,18 +1,23 @@
from argenta.command.flag.models import Flag, InputFlag, ValidationStatus
from argenta.command.flag.flags.models import InputFlags, Flags
from argenta.command.exceptions import (
UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException,
)
__all__ = ["Command", "InputCommand"]
import shlex
from typing import Never, Self, cast, Literal
from argenta.command.exceptions import (
EmptyInputCommandException,
RepeatedInputFlagsException,
UnprocessedInputFlagException,
)
from argenta.command.flag.flags.models import Flags, InputFlags
from argenta.command.flag.models import Flag, InputFlag, ValidationStatus
ParseFlagsResult = tuple[InputFlags, str | None, str | None]
ParseResult = tuple[str, InputFlags]
MIN_FLAG_PREFIX: str = "-"
PREFIX_TYPE = Literal["-", "--", "---"]
DEFAULT_WITHOUT_FLAGS: Flags = Flags()
DEFAULT_WITHOUT_ALIASES: list[Never] = []
DEFAULT_WITHOUT_INPUT_FLAGS: InputFlags = InputFlags()
@@ -20,10 +25,11 @@ DEFAULT_WITHOUT_INPUT_FLAGS: InputFlags = InputFlags()
class Command:
def __init__(
self,
trigger: str, *,
description: str | None = None,
trigger: str,
*,
description: str = "Some useful command",
flags: Flag | Flags = DEFAULT_WITHOUT_FLAGS,
aliases: list[str] | None = None,
aliases: list[str] | list[Never] = DEFAULT_WITHOUT_ALIASES,
):
"""
Public. The command that can and should be registered in the Router
@@ -34,12 +40,10 @@ class Command:
"""
self.registered_flags: Flags = flags if isinstance(flags, Flags) else Flags([flags])
self.trigger: str = trigger
self.description: str = description if description else "Command without description"
self.aliases: list[str] = aliases if aliases else []
self.description: str = description
self.aliases: list[str] | list[Never] = aliases
def validate_input_flag(
self, flag: InputFlag
) -> ValidationStatus:
def validate_input_flag(self, flag: InputFlag) -> ValidationStatus:
"""
Private. Validates the input flag
:param flag: input flag for validation
@@ -57,8 +61,7 @@ class Command:
class InputCommand:
def __init__(self, trigger: str, *,
input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS):
def __init__(self, trigger: str, *, input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS):
"""
Private. The model of the input command, after parsing
:param trigger:the trigger of the command
@@ -66,7 +69,9 @@ class InputCommand:
:return: None
"""
self.trigger: str = trigger
self.input_flags: InputFlags = input_flags if isinstance(input_flags, InputFlags) else InputFlags([input_flags])
self.input_flags: InputFlags = (
input_flags if isinstance(input_flags, InputFlags) else InputFlags([input_flags])
)
@classmethod
def parse(cls, raw_command: str) -> Self:
@@ -75,77 +80,50 @@ class InputCommand:
:param raw_command: raw input command
:return: model of the input command, after parsing as InputCommand
"""
trigger, input_flags = CommandParser(raw_command).parse_raw_command()
return cls(trigger=trigger, input_flags=input_flags)
tokens = shlex.split(raw_command)
class CommandParser:
def __init__(self, raw_command: str) -> None:
self.raw_command: str = raw_command
self._parsed_input_flags: InputFlags = InputFlags()
def parse_raw_command(self) -> ParseResult:
if not self.raw_command:
raise EmptyInputCommandException()
input_flags, crnt_flag_name, crnt_flag_val = self._parse_flags(self.raw_command.split()[1:])
if any([crnt_flag_name, crnt_flag_val]):
raise UnprocessedInputFlagException()
else:
return (self.raw_command.split()[0], input_flags)
def _parse_flags(self, _tokens: list[str] | list[Never]) -> ParseFlagsResult:
crnt_flg_name, crnt_flg_val = None, None
for index, token in enumerate(_tokens):
crnt_flg_name, crnt_flg_val = _parse_single_token(token, crnt_flg_name, crnt_flg_val)
if not crnt_flg_name or self._is_next_token_value(index, _tokens):
continue
if not tokens:
raise EmptyInputCommandException
command = tokens[0]
flags: InputFlags = InputFlags()
i = 1
while i < len(tokens):
token = tokens[i]
if token.startswith("---"):
prefix = "---"
name = token[3:]
elif token.startswith("--"):
prefix = "--"
name = token[2:]
elif token.startswith("-"):
prefix = "-"
name = token[1:]
else:
raise UnprocessedInputFlagException
if not name:
raise UnprocessedInputFlagException
if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
input_value = tokens[i + 1]
i += 2
else:
input_value = ""
i += 1
input_flag = InputFlag(
name=crnt_flg_name[crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1:],
prefix=cast(
Literal["-", "--", "---"],
crnt_flg_name[:crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1],
),
input_value=crnt_flg_val,
name=name,
prefix=cast(PREFIX_TYPE, prefix), # pyright: ignore[reportUnnecessaryCast]
input_value=input_value,
status=None
)
if input_flag in self._parsed_input_flags:
if input_flag in flags:
raise RepeatedInputFlagsException(input_flag)
self._parsed_input_flags.add_flag(input_flag)
crnt_flg_name, crnt_flg_val = None, None
return (self._parsed_input_flags, crnt_flg_name, crnt_flg_val)
def _is_next_token_value(self, current_index: int,
_tokens: list[str] | list[Never]) -> bool:
next_index = current_index + 1
if next_index >= len(_tokens):
return False
flags.add_flag(input_flag)
next_token = _tokens[next_index]
return not next_token.startswith(MIN_FLAG_PREFIX)
def _parse_single_token(
token: str,
crnt_flag_name: str | None,
crnt_flag_val: str | None
) -> tuple[str | None, str | None]:
if not token.startswith(MIN_FLAG_PREFIX):
if not crnt_flag_name or crnt_flag_val:
raise UnprocessedInputFlagException
return crnt_flag_name, token
prefix = token[:token.rfind(MIN_FLAG_PREFIX)]
if len(token) < 2 or len(prefix) > 2:
raise UnprocessedInputFlagException
new_flag_name = token
new_flag_value = None
return new_flag_name, new_flag_value
return cls(command, input_flags=flags)
+3
View File
@@ -0,0 +1,3 @@
__all__ = ["DataBridge"]
from .entity import DataBridge as DataBridge
+23
View File
@@ -0,0 +1,23 @@
__all__ = ["DataBridge"]
from typing import Any
class DataBridge:
def __init__(self, initial_data: dict[str, Any] | None = None) -> None:
self._data: dict[str, Any] = initial_data if initial_data else {}
def update(self, data: dict[str, Any]) -> None:
self._data.update(data)
def get_all(self) -> dict[str, Any]:
return self._data
def clear_all(self) -> None:
self._data.clear()
def get_by_key(self, key: str) -> Any:
return self._data.get(key)
def delete_by_key(self, key: str) -> None:
self._data.pop(key)
+1 -1
View File
@@ -1,2 +1,2 @@
from argenta.di.integration import inject as inject
from argenta.di.integration import FromDishka as FromDishka
from argenta.di.integration import inject as inject
+6 -9
View File
@@ -3,11 +3,10 @@ __all__ = ["inject", "setup_dishka", "FromDishka"]
from typing import Any, Callable, TypeVar
from dishka import Container, FromDishka
from dishka.integrations.base import wrap_injection, is_dishka_injected
from argenta.response import Response
from argenta.app import App
from dishka.integrations.base import is_dishka_injected, wrap_injection
from argenta.app.models import App
from argenta.response.entity import Response
T = TypeVar("T")
@@ -20,20 +19,18 @@ def inject(func: Callable[..., T]) -> Callable[..., T]:
)
def setup_dishka(app: App, *, auto_inject: bool = False) -> None:
def setup_dishka(app: App, container: Container, *, auto_inject: bool = False) -> None:
if auto_inject:
_auto_inject_handlers(app)
Response.patch_by_container(container)
def _get_container_from_response(
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> Container:
def _get_container_from_response(args: tuple[Any, ...], kwargs: dict[str, Any]) -> Container:
for arg in args:
if isinstance(arg, Response):
if hasattr(arg, "_dishka_container"):
return arg._dishka_container # pyright: ignore[reportPrivateUsage]
break
raise RuntimeError("dishka container not found in Response")
+12 -7
View File
@@ -1,14 +1,19 @@
from argenta.orchestrator.argparser import ArgParser
from dishka import Provider, provide, Scope
__all__ = [
"SystemProvider",
]
from dishka import Provider, Scope, provide
from argenta.data_bridge import DataBridge
from argenta.orchestrator.argparser import ArgParser
from argenta.orchestrator.argparser.entity import ArgSpace
class SystemProvider(Provider):
def __init__(self, arg_parser: ArgParser):
super().__init__()
self._arg_parser: ArgParser = arg_parser
@provide(scope=Scope.APP)
def get_argspace(self, arg_parser: ArgParser) -> ArgSpace:
return arg_parser.parsed_argspace
@provide(scope=Scope.APP)
def get_argspace(self) -> ArgSpace:
return self._arg_parser.parse_args()
def get_data_bridge(self) -> DataBridge:
return DataBridge()
+1 -4
View File
@@ -1,4 +1 @@
__all__ = ["get_time_of_pre_cycle_setup"]
from argenta.metrics.main import get_time_of_pre_cycle_setup
from argenta.metrics.main import get_time_of_pre_cycle_setup as get_time_of_pre_cycle_setup
+5 -1
View File
@@ -1,3 +1,7 @@
__all__ = [
"get_time_of_pre_cycle_setup",
]
import io
from contextlib import redirect_stdout
from time import time
@@ -13,6 +17,6 @@ def get_time_of_pre_cycle_setup(app: App) -> float:
"""
start = time()
with redirect_stdout(io.StringIO()):
app._pre_cycle_setup() # pyright: ignore[reportPrivateUsage]
app._pre_cycle_setup() # pyright: ignore[reportPrivateUsage]
end = time()
return end - start
+2 -4
View File
@@ -1,4 +1,2 @@
__all__ = ["ArgParser", "Orchestrator"]
from argenta.orchestrator.argparser.entity import ArgParser
from argenta.orchestrator.entity import Orchestrator
from argenta.orchestrator.argparser.entity import ArgParser as ArgParser
from argenta.orchestrator.entity import Orchestrator as Orchestrator
@@ -1,9 +1,4 @@
__all__ = [
"ArgParser",
"BooleanArgument",
"ValueArgument"
]
from argenta.orchestrator.argparser.entity import ArgParser
from argenta.orchestrator.argparser.arguments import BooleanArgument, ValueArgument
from argenta.orchestrator.argparser.arguments import BooleanArgument as BooleanArgument
from argenta.orchestrator.argparser.arguments import ValueArgument as ValueArgument
from argenta.orchestrator.argparser.entity import ArgParser as ArgParser
from argenta.orchestrator.argparser.entity import ArgSpace as ArgSpace
@@ -1,8 +1,3 @@
__all__ = ["BooleanArgument", "ValueArgument", "InputArgument"]
from argenta.orchestrator.argparser.arguments.models import (
BooleanArgument,
ValueArgument,
InputArgument
)
from argenta.orchestrator.argparser.arguments.models import BooleanArgument as BooleanArgument
from argenta.orchestrator.argparser.arguments.models import InputArgument as InputArgument
from argenta.orchestrator.argparser.arguments.models import ValueArgument as ValueArgument
@@ -1,3 +1,5 @@
__all__ = ["BooleanArgument", "ValueArgument", "InputArgument"]
from typing import Literal
@@ -5,10 +7,8 @@ class BaseArgument:
"""
Private. Base class for all arguments
"""
def __init__(self, name: str, *,
help: str,
is_deprecated: bool,
prefix: Literal["-", "--", "---"]):
def __init__(self, name: str, *, help: str, is_deprecated: bool, prefix: Literal["-", "--", "---"]):
"""
Public. Boolean argument, does not require a value
:param name: name of the argument
@@ -20,20 +20,24 @@ class BaseArgument:
self.help: str = help
self.is_deprecated: bool = is_deprecated
self.prefix: Literal["-", "--", "---"] = prefix
@property
def string_entity(self) -> str:
return self.prefix + self.name
class ValueArgument(BaseArgument):
def __init__(self, name: str, *,
prefix: Literal["-", "--", "---"] = "--",
help: str = "Help message for the value argument",
possible_values: list[str] | None = None,
default: str | None = None,
is_required: bool = False,
is_deprecated: bool = False):
def __init__(
self,
name: str,
*,
prefix: Literal["-", "--", "---"] = "--",
help: str = "Help message for the value argument",
possible_values: list[str] | None = None,
default: str | None = None,
is_required: bool = False,
is_deprecated: bool = False,
):
"""
Public. Value argument, must have the value
:param name: name of the argument
@@ -52,10 +56,14 @@ class ValueArgument(BaseArgument):
class BooleanArgument(BaseArgument):
def __init__(self, name: str, *,
prefix: Literal["-", "--", "---"] = "--",
help: str = "Help message for the boolean argument",
is_deprecated: bool = False):
def __init__(
self,
name: str,
*,
prefix: Literal["-", "--", "---"] = "--",
help: str = "Help message for the boolean argument",
is_deprecated: bool = False,
):
"""
Public. Boolean argument, does not require a value
:param name: name of the argument
@@ -68,15 +76,13 @@ class BooleanArgument(BaseArgument):
class InputArgument:
def __init__(self, name: str,
value: str | None,
founder_class: type[BaseArgument]) -> None:
def __init__(self, name: str, value: str | Literal[True], founder_class: type[BaseArgument]) -> None:
self.name: str = name
self.value: str | None = value
self.value: str | Literal[True] = value
self.founder_class: type[BaseArgument] = founder_class
def __str__(self) -> str:
return f"InputArgument({self.name}={self.value})"
def __repr__(self) -> str:
return f"InputArgument<name={self.name}, value={self.value}, founder_class={self.founder_class.__name__}>"
+98 -42
View File
@@ -1,3 +1,9 @@
__all__ = [
"ArgSpace",
"ArgParser",
]
import sys
from argparse import ArgumentParser, Namespace
from typing import Never, Self
@@ -5,40 +11,63 @@ from argenta.orchestrator.argparser.arguments.models import (
BaseArgument,
BooleanArgument,
InputArgument,
ValueArgument
ValueArgument,
)
class ArgSpace:
def __init__(self, all_arguments: list[InputArgument]) -> None:
self.all_arguments = all_arguments
@classmethod
def from_namespace(cls, namespace: Namespace,
processed_args: list[ValueArgument | BooleanArgument]) -> Self:
name_type_paired_args: dict[str, type[BaseArgument]] = {
arg.name: type(arg)
for arg in processed_args
self._name_object_paired_args: dict[str, InputArgument] = {}
self._type_object_paired_args: dict[type[BaseArgument], list[InputArgument]] = {
BooleanArgument: [],
ValueArgument: []
}
return cls([InputArgument(name=name,
value=value,
founder_class=name_type_paired_args[name])
for name, value in vars(namespace).items()])
self._setup_getters()
@classmethod
def from_namespace(
cls,
namespace: Namespace,
processed_args: list[ValueArgument | BooleanArgument]
) -> Self:
name_type_paired_processed_args: dict[str, type[BaseArgument]] = {
arg.name: type(arg) for arg in processed_args
}
parsed_arguments: list[InputArgument] = []
for name, value in vars(namespace).items():
parsed_arguments.append(
InputArgument(
name=name,
value=value,
founder_class=name_type_paired_processed_args[name]
)
)
return cls(parsed_arguments)
def _setup_getters(self) -> None:
if not self.all_arguments:
return
for input_arg in self.all_arguments:
self._name_object_paired_args[input_arg.name] = input_arg
self._type_object_paired_args[input_arg.founder_class].append(input_arg)
def get_by_name(self, name: str) -> InputArgument | None:
for arg in self.all_arguments:
if arg.name == name:
return arg
return None
return self._name_object_paired_args.get(name)
def get_by_type(self, arg_type: type[BaseArgument]) -> list[InputArgument] | list[Never]:
return [arg for arg in self.all_arguments if arg.founder_class is arg_type]
return self._type_object_paired_args.get(arg_type, [])
class ArgParser:
def __init__(
self,
processed_args: list[ValueArgument | BooleanArgument], *,
processed_args: list[ValueArgument | BooleanArgument],
*,
name: str = "Argenta",
description: str = "Argenta available arguments",
epilog: str = "github.com/koloideal/Argenta | made by kolo",
@@ -55,24 +84,51 @@ class ArgParser:
self.epilog: str = epilog
self.processed_args: list[ValueArgument | BooleanArgument] = processed_args
self._core: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
for arg in processed_args:
if isinstance(arg, BooleanArgument):
_ = self._core.add_argument(arg.string_entity,
action=arg.action,
help=arg.help,
deprecated=arg.is_deprecated)
else:
_ = self._core.add_argument(arg.string_entity,
action=arg.action,
help=arg.help,
default=arg.default,
choices=arg.possible_values,
required=arg.is_required,
deprecated=arg.is_deprecated)
self.parsed_argspace: ArgSpace = ArgSpace([])
def parse_args(self) -> ArgSpace:
return ArgSpace.from_namespace(namespace=self._core.parse_args(),
processed_args=self.processed_args)
self._core: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
self._register_args(processed_args)
def _parse_args(self) -> None:
self.parsed_argspace = ArgSpace.from_namespace(
namespace=self._core.parse_args(), processed_args=self.processed_args
)
def _register_args(self, processed_args: list[ValueArgument | BooleanArgument]) -> None:
if sys.version_info >= (3, 13):
for arg in processed_args:
if isinstance(arg, BooleanArgument):
_ = self._core.add_argument(
arg.string_entity,
action=arg.action,
help=arg.help,
deprecated=arg.is_deprecated
)
else:
_ = self._core.add_argument(
arg.string_entity,
action=arg.action,
help=arg.help,
default=arg.default,
choices=arg.possible_values,
required=arg.is_required,
deprecated=arg.is_deprecated,
)
else:
for arg in processed_args:
if isinstance(arg, BooleanArgument):
_ = self._core.add_argument(
arg.string_entity,
action=arg.action,
help=arg.help,
)
else:
_ = self._core.add_argument(
arg.string_entity,
action=arg.action,
help=arg.help,
default=arg.default,
choices=arg.possible_values,
required=arg.is_required
)
+17 -12
View File
@@ -1,20 +1,22 @@
from argenta.app import App
from argenta.response import Response
from argenta.orchestrator.argparser import ArgParser
from argenta.di.integration import setup_dishka
from argenta.di.providers import SystemProvider
__all__ = ["Orchestrator"]
from dishka import Provider, make_container
from argenta.app import App
from argenta.di.integration import setup_dishka
from argenta.di.providers import SystemProvider
from argenta.orchestrator.argparser import ArgParser
DEFAULT_ARGPARSER: ArgParser = ArgParser(processed_args=[])
class Orchestrator:
def __init__(self, arg_parser: ArgParser = DEFAULT_ARGPARSER,
custom_providers: list[Provider] = [],
auto_inject_handlers: bool = True):
def __init__(
self,
arg_parser: ArgParser = DEFAULT_ARGPARSER,
custom_providers: list[Provider] = [],
auto_inject_handlers: bool = True,
):
"""
Public. An orchestrator and configurator that defines the behavior of an integrated system, one level higher than the App
:param arg_parser: Cmd argument parser and configurator at startup
@@ -24,14 +26,17 @@ class Orchestrator:
self._custom_providers: list[Provider] = custom_providers
self._auto_inject_handlers: bool = auto_inject_handlers
self._arg_parser._parse_args()
def start_polling(self, app: App) -> None:
"""
Public. Starting the user input processing cycle
:param app: a running application
:return: None
"""
container = make_container(SystemProvider(self._arg_parser), *self._custom_providers)
Response.patch_by_container(container)
setup_dishka(app, auto_inject=self._auto_inject_handlers)
container = make_container(
SystemProvider(), *self._custom_providers, context={ArgParser: self._arg_parser}
)
setup_dishka(app, container, auto_inject=self._auto_inject_handlers)
app.run_polling()
+2 -5
View File
@@ -1,5 +1,2 @@
__all__ = ["Response", "ResponseStatus"]
from argenta.response.entity import Response
from argenta.response.status import ResponseStatus
from argenta.response.entity import Response as Response
from argenta.response.status import ResponseStatus as ResponseStatus
+2 -23
View File
@@ -1,35 +1,14 @@
from typing import Any
__all__ = ["Response"]
from dishka import Container
from argenta.command.flag.flags.models import InputFlags
from argenta.response.status import ResponseStatus
EMPTY_INPUT_FLAGS: InputFlags = InputFlags()
class DataBridge:
_data: dict[str, Any] = {}
@classmethod
def update_data(cls, data: dict[str, Any]) -> None:
cls._data.update(data)
@classmethod
def get_data(cls) -> dict[str, Any]:
return cls._data
@classmethod
def clear_data(cls) -> None:
cls._data.clear()
@classmethod
def delete_from_data(cls, key: str) -> None:
cls._data.pop(key)
class Response(DataBridge):
class Response:
_dishka_container: Container
def __init__(
+6 -4
View File
@@ -1,3 +1,5 @@
__all__ = ["ResponseStatus"]
from enum import Enum
@@ -8,12 +10,12 @@ class ResponseStatus(Enum):
UNDEFINED_AND_INVALID_FLAGS = "UNDEFINED_AND_INVALID_FLAGS"
@classmethod
def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> 'ResponseStatus':
def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> "ResponseStatus":
key = (has_invalid_value_flags, has_undefined_flags)
status_map: dict[tuple[bool, bool], ResponseStatus] = {
(True, True): cls.UNDEFINED_AND_INVALID_FLAGS,
(True, False): cls.INVALID_VALUE_FLAGS,
(False, True): cls.UNDEFINED_FLAGS,
(True, True): cls.UNDEFINED_AND_INVALID_FLAGS,
(True, False): cls.INVALID_VALUE_FLAGS,
(False, True): cls.UNDEFINED_FLAGS,
(False, False): cls.ALL_FLAGS_VALID,
}
return status_map[key]
+1 -4
View File
@@ -1,4 +1 @@
from argenta.router.entity import Router
__all__ = ["Router"]
from argenta.router.entity import Router as Router
+3 -3
View File
@@ -1,3 +1,5 @@
__all__ = ["CommandHandler", "CommandHandlers"]
from collections.abc import Iterator
from typing import Callable
@@ -30,9 +32,7 @@ class CommandHandlers:
Private. The model that unites all CommandHandler of the routers
:param command_handlers: list of CommandHandlers for register
"""
self.command_handlers: list[CommandHandler] = (
command_handlers if command_handlers else []
)
self.command_handlers: list[CommandHandler] = command_handlers if command_handlers else []
def add_handler(self, command_handler: CommandHandler) -> None:
"""
+2 -1
View File
@@ -1,4 +1,5 @@
__all__ = ["system_router"]
from argenta.router import Router
system_router = Router(title="System points:")
+12 -20
View File
@@ -1,27 +1,29 @@
__all__ = ["Router"]
from inspect import get_annotations, getfullargspec, getsourcefile, getsourcelines
from typing import Callable, TypeAlias
from inspect import getfullargspec, get_annotations, getsourcefile, getsourcelines
from rich.console import Console
from argenta.command import Command, InputCommand
from argenta.command.flag import ValidationStatus
from argenta.response import Response, ResponseStatus
from argenta.router.command_handler.entity import CommandHandlers, CommandHandler
from argenta.command.flag.flags import Flags, InputFlags
from argenta.response import Response, ResponseStatus
from argenta.router.command_handler.entity import CommandHandler, CommandHandlers
from argenta.router.exceptions import (
RepeatedFlagNameException,
RequiredArgumentNotPassedException,
TriggerContainSpacesException,
)
HandlerFunc: TypeAlias = Callable[..., None]
class Router:
def __init__(
self,
*,
title: str | None = "Default title",
*,
disable_redirect_stdout: bool = False,
):
"""
@@ -76,9 +78,7 @@ class Router:
if input_command_name.lower() in handle_command.aliases:
self.process_input_command(input_command_flags, command_handler)
def process_input_command(
self, input_command_flags: InputFlags, command_handler: CommandHandler
) -> None:
def process_input_command(self, input_command_flags: InputFlags, command_handler: CommandHandler) -> None:
"""
Private. Processes input command with the appropriate handler
:param input_command_flags: input command flags as InputFlags
@@ -88,9 +88,7 @@ class Router:
handle_command = command_handler.handled_command
if handle_command.registered_flags.flags:
if input_command_flags.flags:
response: Response = _structuring_input_flags(
handle_command, input_command_flags
)
response: Response = _structuring_input_flags(handle_command, input_command_flags)
command_handler.handling(response)
else:
response = Response(ResponseStatus.ALL_FLAGS_VALID)
@@ -101,9 +99,7 @@ class Router:
for input_flag in input_command_flags:
input_flag.status = ValidationStatus.UNDEFINED
undefined_flags.add_flag(input_flag)
response = Response(
ResponseStatus.UNDEFINED_FLAGS, input_flags=undefined_flags
)
response = Response(ResponseStatus.UNDEFINED_FLAGS, input_flags=undefined_flags)
command_handler.handling(response)
else:
response = Response(ResponseStatus.ALL_FLAGS_VALID)
@@ -140,15 +136,11 @@ class CommandDecorator:
def __call__(self, handler_func: Callable[..., None]) -> Callable[..., None]:
_validate_func_args(handler_func)
self.router.command_handlers.add_handler(
CommandHandler(handler_func, self.command)
)
self.router.command_handlers.add_handler(CommandHandler(handler_func, self.command))
return handler_func
def _structuring_input_flags(
handled_command: Command, input_flags: InputFlags
) -> Response:
def _structuring_input_flags(handled_command: Command, input_flags: InputFlags) -> Response:
"""
Private. Validates flags of input command
:param handled_command: entity of the handled command
+2
View File
@@ -1,3 +1,5 @@
__all__ = ["RepeatedFlagNameException", "RequiredArgumentNotPassedException", "TriggerContainSpacesException"]
from typing import override