pre-release v1.0.0

This commit is contained in:
2025-04-24 21:26:41 +03:00
parent 5bcae8fe68
commit 89f09c42f8
34 changed files with 78 additions and 40 deletions
View File
View File
+3
View File
@@ -0,0 +1,3 @@
__all__ = ["App"]
from argenta.app.models import App
@@ -0,0 +1,4 @@
__all__ = ["AutoCompleter"]
from argenta.app.autocompleter.entity import AutoCompleter
+72
View File
@@ -0,0 +1,72 @@
import os
import readline
class AutoCompleter:
def __init__(self, history_filename: str = False, autocomplete_button: str = 'tab') -> None:
"""
Public. Configures and implements auto-completion of input command
:param history_filename: the name of the file for saving the history of the autocompleter
:param autocomplete_button: the button for auto-completion
:return: None
"""
self.history_filename = history_filename
self.autocomplete_button = autocomplete_button
self.matches: list[str] = []
def complete(self, text, state) -> str | None:
"""
Private. Auto-completion function
:param text: part of the command being entered
:param state: the current cursor position is relative to the beginning of the line
:return: the desired candidate as str or None
"""
matches: list[str] = sorted(cmd for cmd in self.get_history_items() if cmd.startswith(text))
if len(matches) > 1:
common_prefix = matches[0]
for match in matches[1:]:
i = 0
while i < len(common_prefix) and i < len(match) and common_prefix[i] == match[i]:
i += 1
common_prefix = common_prefix[:i]
if state == 0:
readline.insert_text(common_prefix[len(text):])
readline.redisplay()
return None
elif len(matches) == 1:
return matches[0] if state == 0 else None
else:
return None
def initial_setup(self, all_commands: list[str]) -> None:
"""
Private. Initial setup function
:param all_commands: Registered commands for adding them to the autocomplete history
:return: None
"""
if self.history_filename:
if os.path.exists(self.history_filename):
readline.read_history_file(self.history_filename)
else:
for line in all_commands:
readline.add_history(line)
readline.set_completer(self.complete)
readline.set_completer_delims(readline.get_completer_delims().replace(' ', ''))
readline.parse_and_bind(f'{self.autocomplete_button}: complete')
def exit_setup(self) -> None:
"""
Private. Exit setup function
:return: None
"""
if self.history_filename:
readline.write_history_file(self.history_filename)
@staticmethod
def get_history_items() -> list[str] | list:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str]
"""
return [readline.get_history_item(i) for i in range(1, readline.get_current_history_length() + 1)]
+12
View File
@@ -0,0 +1,12 @@
from dataclasses import dataclass
@dataclass
class PredefinedMessages:
"""
Public. A dataclass with predetermined messages for quick use
"""
USAGE = '[b dim]Usage[/b dim]: [i]<command> <[green]flags[/green]>[/i]'
HELP = '[b dim]Help[/b dim]: [i]<command>[/i] [b red]--help[/b red]'
AUTOCOMPLETE = '[b dim]Autocomplete[/b dim]: [i]<part>[/i] [bold]<tab>'
@@ -0,0 +1,4 @@
__all__ = ["StaticDividingLine", "DynamicDividingLine"]
from argenta.app.dividing_line.models import StaticDividingLine, DynamicDividingLine
+68
View File
@@ -0,0 +1,68 @@
from abc import ABC
class BaseDividingLine(ABC):
def __init__(self, unit_part: str = '-') -> None:
"""
Private. The basic dividing line
:param unit_part: the single part of the dividing line
:return: None
"""
self._unit_part = unit_part
def get_unit_part(self) -> str:
"""
Private. Returns the unit part of the dividing line
:return: unit_part of dividing line as str
"""
if len(self._unit_part) == 0:
return ' '
else:
return self._unit_part[0]
class StaticDividingLine(BaseDividingLine):
def __init__(self, unit_part: str = '-', length: int = 25) -> None:
"""
Public. The static dividing line
:param unit_part: the single part of the dividing line
:param length: the length of the dividing line
:return: None
"""
super().__init__(unit_part)
self.length = length
def get_full_static_line(self, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param is_override: has the default text layout been redefined
:return: full line of dividing line as str
"""
if is_override:
return f'\n{self.length * self.get_unit_part()}\n'
else:
return f'\n[dim]{self.length * self.get_unit_part()}[/dim]\n'
class DynamicDividingLine(BaseDividingLine):
def __init__(self, unit_part: str = '-') -> None:
"""
Public. The dynamic dividing line
:param unit_part: the single part of the dividing line
:return: None
"""
super().__init__(unit_part)
def get_full_dynamic_line(self, length: int, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param length: the length of the dividing line
:param is_override: has the default text layout been redefined
:return: full line of dividing line as str
"""
if is_override:
return f'\n{length * self.get_unit_part()}\n'
else:
return f'\n[dim]{self.get_unit_part() * length}[/dim]\n'
+8
View File
@@ -0,0 +1,8 @@
class NoRegisteredHandlersException(Exception):
"""
The router has no registered handlers
"""
def __init__(self, router_name) -> None:
self.router_name = router_name
def __str__(self):
return f"No Registered Handlers Found For '{self.router_name}'"
+393
View File
@@ -0,0 +1,393 @@
from typing import Callable
from rich.console import Console
from rich.markup import escape
from art import text2art
from contextlib import redirect_stdout
import io
import re
from argenta.command.models import Command, InputCommand
from argenta.router import Router
from argenta.router.defaults import system_router
from argenta.app.autocompleter import AutoCompleter
from argenta.app.dividing_line.models import StaticDividingLine, DynamicDividingLine
from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException,
BaseInputCommandException)
from argenta.app.exceptions import NoRegisteredHandlersException
from argenta.app.registered_routers.entity import RegisteredRouters
class BaseApp:
def __init__(self,
prompt: str,
initial_message: str,
farewell_message: str,
exit_command: Command,
system_router_title: str | None,
ignore_command_register: bool,
dividing_line: StaticDividingLine | DynamicDividingLine,
repeat_command_groups: bool,
override_system_messages: bool,
autocompleter: AutoCompleter,
print_func: Callable[[str], None]) -> None:
self._prompt = prompt
self._print_func = print_func
self._exit_command = exit_command
self._system_router_title = system_router_title
self._dividing_line = dividing_line
self._ignore_command_register = ignore_command_register
self._repeat_command_groups_description = repeat_command_groups
self._override_system_messages = override_system_messages
self._autocompleter = autocompleter
self._farewell_message = farewell_message
self._initial_message = initial_message
self._description_message_gen: Callable[[str, str], str] = lambda command, description: f'[{command}] *=*=* {description}'
self._registered_routers: RegisteredRouters = RegisteredRouters()
self._messages_on_startup: list[str] = []
self._all_registered_triggers_in_lower: list[str] = []
self._all_registered_triggers_in_default_case: list[str] = []
self._invalid_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Incorrect flag syntax: {raw_command}')
self._repeated_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Repeated input flags: {raw_command}')
self._empty_input_command_handler: Callable[[], None] = lambda: print_func('Empty input command')
self._unknown_command_handler: Callable[[InputCommand], None] = lambda command: print_func(f"Unknown command: {command.get_trigger()}")
self._exit_command_handler: Callable[[], None] = lambda: print_func(self._farewell_message)
def set_description_message_pattern(self, pattern: Callable[[str, str], str]) -> None:
"""
Public. Sets the output pattern of the available commands
:param pattern: output pattern of the available commands
:return: None
"""
self._description_message_gen: Callable[[str, str], str] = pattern
def set_invalid_input_flags_handler(self, handler: Callable[[str], None]) -> None:
"""
Public. Sets the handler for incorrect flags when entering a command
:param handler: handler for incorrect flags when entering a command
:return: None
"""
self._invalid_input_flags_handler = handler
def set_repeated_input_flags_handler(self, handler: Callable[[str], None]) -> None:
"""
Public. Sets the handler for repeated flags when entering a command
:param handler: handler for repeated flags when entering a command
:return: None
"""
self._repeated_input_flags_handler = handler
def set_unknown_command_handler(self, handler: Callable[[str], None]) -> None:
"""
Public. Sets the handler for unknown commands when entering a command
:param handler: handler for unknown commands when entering a command
:return: None
"""
self._unknown_command_handler = handler
def set_empty_command_handler(self, handler: Callable[[], None]) -> None:
"""
Public. Sets the handler for empty commands when entering a command
:param handler: handler for empty commands when entering a command
:return: None
"""
self._empty_input_command_handler = handler
def set_exit_command_handler(self, handler: Callable[[], None]) -> None:
"""
Public. Sets the handler for exit command when entering a command
:param handler: handler for exit command when entering a command
:return: None
"""
self._exit_command_handler = handler
def _print_command_group_description(self) -> None:
"""
Private. Prints the description of the available commands
:return: None
"""
for registered_router in self._registered_routers:
if registered_router.get_title():
self._print_func(registered_router.get_title())
for command_handler in registered_router.get_command_handlers():
self._print_func(self._description_message_gen(
command_handler.get_handled_command().get_trigger(),
command_handler.get_handled_command().get_description()))
self._print_func('')
def _print_framed_text(self, text: str) -> None:
"""
Private. Outputs text by framing it in a static or dynamic split strip
:param text: framed text
:return: None
"""
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
self._print_func(text)
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
elif isinstance(self._dividing_line, DynamicDividingLine):
clear_text = re.sub(r'\u001b\[[0-9;]*m', '', text)
max_length_line = max([len(line) for line in clear_text.split('\n')])
max_length_line = max_length_line if 10 <= max_length_line <= 80 else 80 if max_length_line > 80 else 10
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
print(text.strip('\n'))
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
def _is_exit_command(self, command: InputCommand) -> bool:
"""
Private. Checks if the given command is an exit command
:param command: command to check
:return: is it an exit command or not as bool
"""
if self._ignore_command_register:
if command.get_trigger().lower() == self._exit_command.get_trigger().lower():
return True
elif command.get_trigger().lower() in [x.lower() for x in self._exit_command.get_aliases()]:
return True
else:
if command.get_trigger() == self._exit_command.get_trigger():
return True
elif command.get_trigger() in self._exit_command.get_aliases():
return True
return False
def _is_unknown_command(self, command: InputCommand) -> bool:
"""
Private. Checks if the given command is an unknown command
:param command: command to check
:return: is it an unknown command or not as bool
"""
input_command_trigger = command.get_trigger()
if self._ignore_command_register:
if input_command_trigger.lower() in self._all_registered_triggers_in_lower:
return False
else:
if input_command_trigger in self._all_registered_triggers_in_default_case:
return False
with redirect_stdout(io.StringIO()) as f:
self._unknown_command_handler(command)
res: str = f.getvalue()
self._print_framed_text(res)
return True
def _error_handler(self, error: BaseInputCommandException, raw_command: str) -> None:
"""
Private. Handles parsing errors of the entered command
:param error: error being handled
:param raw_command: the raw input command
:return: None
"""
match error:
case UnprocessedInputFlagException():
self._invalid_input_flags_handler(raw_command)
case RepeatedInputFlagsException():
self._repeated_input_flags_handler(raw_command)
case EmptyInputCommandException():
self._empty_input_command_handler()
def _validate_included_routers(self) -> None:
"""
Private. Validates included routers
:return: None
"""
for router in self._registered_routers:
if not router.get_command_handlers():
raise NoRegisteredHandlersException(router.get_name())
def _setup_system_router(self) -> None:
"""
Private. Sets up system router
:return: None
"""
system_router.set_title(self._system_router_title)
@system_router.command(self._exit_command)
def exit_command():
self._exit_command_handler()
if system_router not in self._registered_routers.get_registered_routers():
system_router.set_command_register_ignore(self._ignore_command_register)
self._registered_routers.add_registered_router(system_router)
def _setup_default_view(self) -> None:
"""
Private. Sets up default app view
:return: None
"""
if not self._override_system_messages:
self._initial_message = f'\n[bold red]{text2art(self._initial_message, font="tarty1")}\n\n'
self._farewell_message = (f'[bold red]\n{text2art(f"\n{self._farewell_message}\n", font="chanky")}[/bold red]\n'
f'[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n')
self._description_message_gen = lambda command, description: (f'[bold red]{escape("[" + command + "]")}[/bold red] '
f'[blue dim]*=*=*[/blue dim] '
f'[bold yellow italic]{escape(description)}')
self._invalid_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Incorrect flag syntax: {escape(raw_command)}')
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Repeated input flags: {escape(raw_command)}')
self._empty_input_command_handler = lambda: self._print_func('[red bold]Empty input command')
self._unknown_command_handler = lambda command: self._print_func(f"[red bold]Unknown command: {escape(command.get_trigger())}")
def _pre_cycle_setup(self) -> None:
"""
Private. Configures various aspects of the application before the start of the cycle
:return: None
"""
self._setup_default_view()
self._setup_system_router()
self._validate_included_routers()
for router_entity in self._registered_routers:
self._all_registered_triggers_in_default_case.extend(router_entity.get_triggers())
self._all_registered_triggers_in_default_case.extend(router_entity.get_aliases())
self._all_registered_triggers_in_lower.extend([x.lower() for x in router_entity.get_triggers()])
self._all_registered_triggers_in_lower.extend([x.lower() for x in router_entity.get_aliases()])
self._autocompleter.initial_setup(self._all_registered_triggers_in_lower)
self._print_func(self._initial_message)
for message in self._messages_on_startup:
self._print_func(message)
if self._messages_on_startup:
print('\n\n')
if not self._repeat_command_groups_description:
self._print_command_group_description()
class App(BaseApp):
def __init__(self,
prompt: str = '[italic dim bold]What do you want to do?\n',
initial_message: str = '\nArgenta\n',
farewell_message: str = '\nSee you\n',
exit_command: Command = Command('Q', 'Exit command'),
system_router_title: str | None = 'System points:',
ignore_command_register: bool = True,
dividing_line: StaticDividingLine | DynamicDividingLine = StaticDividingLine(),
repeat_command_groups: bool = True,
override_system_messages: bool = False,
autocompleter: AutoCompleter = AutoCompleter(),
print_func: Callable[[str], None] = Console().print) -> None:
"""
Public. The essence of the application itself.
Configures and manages all aspects of the behavior and presentation of the user interacting with the user
:param prompt: displayed before entering the command
:param initial_message: displayed at the start of the app
:param farewell_message: displayed at the end of the app
:param exit_command: the entity of the command that will be terminated when entered
:param system_router_title: system router title
:param ignore_command_register: whether to ignore the case of the entered commands
:param dividing_line: the entity of the dividing line
:param repeat_command_groups: whether to repeat the available commands and their description
:param override_system_messages: whether to redefine the default formatting of system messages
:param autocompleter: the entity of the autocompleter
:param print_func: system messages text output function
:return: None
"""
super().__init__(prompt=prompt,
initial_message=initial_message,
farewell_message=farewell_message,
exit_command=exit_command,
system_router_title=system_router_title,
ignore_command_register=ignore_command_register,
dividing_line=dividing_line,
repeat_command_groups=repeat_command_groups,
override_system_messages=override_system_messages,
autocompleter=autocompleter,
print_func=print_func)
def run_polling(self) -> None:
"""
Private. Starts the user input processing cycle
:return: None
"""
self._pre_cycle_setup()
while True:
if self._repeat_command_groups_description:
self._print_command_group_description()
raw_command: str = Console().input(self._prompt)
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except BaseInputCommandException as error:
with redirect_stdout(io.StringIO()) as f:
self._error_handler(error, raw_command)
res: str = f.getvalue()
self._print_framed_text(res)
continue
if self._is_exit_command(input_command):
system_router.finds_appropriate_handler(input_command)
self._autocompleter.exit_setup()
return
if self._is_unknown_command(input_command):
continue
with redirect_stdout(io.StringIO()) as f:
for registered_router in self._registered_routers:
registered_router.finds_appropriate_handler(input_command)
res: str = f.getvalue()
self._print_framed_text(res)
if not self._repeat_command_groups_description:
self._print_func(self._prompt)
def include_router(self, router: Router) -> None:
"""
Public. Registers the router in the application
:param router: registered router
:return: None
"""
router.set_command_register_ignore(self._ignore_command_register)
self._registered_routers.add_registered_router(router)
def include_routers(self, *routers: Router) -> None:
"""
Public. Registers the routers in the application
:param routers: registered routers
:return: None
"""
for router in routers:
self.include_router(router)
def add_message_on_startup(self, message: str) -> None:
"""
Public. Adds a message that will be displayed when the application is launched
:param message: the message being added
:return: None
"""
self._messages_on_startup.append(message)
@@ -0,0 +1,40 @@
from argenta.router import Router
class RegisteredRouters:
def __init__(self, registered_routers: list[Router] = None) -> None:
"""
Private. Combines registered routers
:param registered_routers: list of the registered routers
:return: None
"""
self._registered_routers = registered_routers if registered_routers else []
def get_registered_routers(self) -> list[Router]:
"""
Private. Returns the registered routers
:return: registered routers as list[Router]
"""
return self._registered_routers
def add_registered_router(self, router: Router) -> None:
"""
Private. Adds a new registered router
:param router: registered router
:return: None
"""
self._registered_routers.append(router)
def add_registered_routers(self, *routers: Router) -> None:
"""
Private. Adds new registered routers
:param routers: registered routers
:return: None
"""
self._registered_routers.extend(routers)
def __iter__(self):
return iter(self._registered_routers)
def __next__(self):
return next(iter(self._registered_routers))
+3
View File
@@ -0,0 +1,3 @@
__all__ = ["Command"]
from argenta.command.models import Command
+35
View File
@@ -0,0 +1,35 @@
from argenta.command.flag.models import InputFlag, Flag
class BaseInputCommandException(Exception):
"""
Private. Base exception class for all exceptions raised when parse input command
"""
pass
class UnprocessedInputFlagException(BaseInputCommandException):
"""
Private. Raised when an unprocessed input flag is detected
"""
def __str__(self):
return "Unprocessed Input Flags"
class RepeatedInputFlagsException(BaseInputCommandException):
"""
Private. Raised when repeated input flags are detected
"""
def __init__(self, flag: Flag | InputFlag):
self.flag = flag
def __str__(self):
return ("Repeated Input Flags\n"
f"Duplicate flag was detected in the input: '{self.flag.get_string_entity()}'")
class EmptyInputCommandException(BaseInputCommandException):
"""
Private. Raised when an empty input command is detected
"""
def __str__(self):
return "Input Command is empty"
+4
View File
@@ -0,0 +1,4 @@
__all__ = ('InputFlags', 'InputFlag', 'Flag', 'Flags')
from argenta.command.flag.models import InputFlags, InputFlag, Flags, Flag
+24
View File
@@ -0,0 +1,24 @@
from dataclasses import dataclass
from argenta.command.flag.models import Flag
import re
@dataclass
class PredefinedFlags:
"""
Public. A dataclass with predefined flags and most frequently used flags for quick use
"""
HELP = Flag(name='help', possible_values=False)
SHORT_HELP = Flag(name='h', prefix='-', possible_values=False)
INFO = Flag(name='info', possible_values=False)
SHORT_INFO = Flag(name='i', prefix='-', possible_values=False)
ALL = Flag(name='all', possible_values=False)
SHORT_ALL = Flag(name='a', prefix='-', possible_values=False)
HOST = Flag(name='host', possible_values=re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'))
SHORT_HOST = Flag(name='h', prefix='-', possible_values=re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'))
PORT = Flag(name='port', possible_values=re.compile(r'^\d{1,5}$'))
SHORT_PORT = Flag(name='p', prefix='-', possible_values=re.compile(r'^\d{1,5}$'))
+218
View File
@@ -0,0 +1,218 @@
from typing import Literal, Pattern
from abc import ABC, abstractmethod
class BaseFlag(ABC):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--') -> None:
"""
Private. Base class for flags
:param name: the name of the flag
:param prefix: the prefix of the flag
:return: None
"""
self._name = name
self._prefix = prefix
def get_string_entity(self) -> str:
"""
Public. Returns a string representation of the flag
:return: string representation of the flag as str
"""
string_entity: str = self._prefix + self._name
return string_entity
def get_name(self) -> str:
"""
Public. Returns the name of the flag
:return: the name of the flag as str
"""
return self._name
def get_prefix(self) -> str:
"""
Public. Returns the prefix of the flag
:return: the prefix of the flag as str
"""
return self._prefix
class InputFlag(BaseFlag):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--',
value: str = None):
"""
Public. The entity of the flag of the entered command
:param name: the name of the input flag
:param prefix: the prefix of the input flag
:param value: the value of the input flag
:return: None
"""
super().__init__(name, prefix)
self._flag_value = value
def get_value(self) -> str | None:
"""
Public. Returns the value of the flag
:return: the value of the flag as str
"""
return self._flag_value
def set_value(self, value):
"""
Private. Sets the value of the flag
:param value: the fag value to set
:return: None
"""
self._flag_value = value
class Flag(BaseFlag):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--',
possible_values: list[str] | Pattern[str] | False = True) -> None:
"""
Public. The entity of the flag being registered for subsequent processing
:param name: The name of the flag
:param prefix: The prefix of the flag
:param possible_values: The possible values of the flag, if False then the flag cannot have a value
:return: None
"""
super().__init__(name, prefix)
self.possible_values = possible_values
def validate_input_flag_value(self, input_flag_value: str | None):
"""
Private. Validates the input flag value
:param input_flag_value: The input flag value to validate
:return: whether the entered flag is valid as bool
"""
if self.possible_values is False:
if input_flag_value is None:
return True
else:
return False
elif isinstance(self.possible_values, Pattern):
if isinstance(input_flag_value, str):
is_valid = bool(self.possible_values.match(input_flag_value))
if bool(is_valid):
return True
else:
return False
else:
return False
elif isinstance(self.possible_values, list):
if input_flag_value in self.possible_values:
return True
else:
return False
else:
return True
class BaseFlags(ABC):
"""
Private. Base class for groups of flags
"""
__slots__ = ('_flags',)
@abstractmethod
def get_flags(self):
"""
Public. Returns a list of flags
:return: list of flags
"""
pass
@abstractmethod
def add_flag(self, flag: Flag | InputFlag):
"""
Public. Adds a flag to the list of flags
:param flag: flag to add
:return: None
"""
pass
@abstractmethod
def add_flags(self, flags: list[Flag] | list[InputFlag]):
"""
Public. Adds a list of flags to the list of flags
:param flags: list of flags to add
:return: None
"""
pass
@abstractmethod
def get_flag(self, name: str):
"""
Public. Returns the flag entity by its name or None if not found
:param name: the name of the flag to get
:return: entity of the flag or None
"""
pass
def __iter__(self):
return iter(self._flags)
def __next__(self):
return next(iter(self))
def __getitem__(self, item):
return self._flags[item]
class Flags(BaseFlags, ABC):
def __init__(self, *flags: Flag):
"""
Public. A model that combines the registered flags
:param flags: the flags that will be registered
:return: None
"""
self._flags = flags if flags else []
def get_flags(self) -> list[Flag]:
return self._flags
def add_flag(self, flag: Flag):
self._flags.append(flag)
def add_flags(self, flags: list[Flag]):
self._flags.extend(flags)
def get_flag(self, name: str) -> Flag | None:
if name in [flag.get_name() for flag in self._flags]:
return list(filter(lambda flag: flag.get_name() == name, self._flags))[0]
else:
return None
class InputFlags(BaseFlags, ABC):
def __init__(self, *flags: InputFlag):
"""
Public. A model that combines the input flags of the input command
:param flags: all input flags
:return: None
"""
self._flags = flags if flags else []
def get_flags(self) -> list[InputFlag]:
return self._flags
def add_flag(self, flag: InputFlag):
self._flags.append(flag)
def add_flags(self, flags: list[InputFlag]):
self._flags.extend(flags)
def get_flag(self, name: str) -> InputFlag | None:
if name in [flag.get_name() for flag in self._flags]:
return list(filter(lambda flag: flag.get_name() == name, self._flags))[0]
else:
return None
+164
View File
@@ -0,0 +1,164 @@
from argenta.command.flag.models import Flag, InputFlag, Flags, InputFlags
from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException)
from typing import Generic, TypeVar, cast, Literal
InputCommandType = TypeVar('InputCommandType')
class BaseCommand:
def __init__(self, trigger: str) -> None:
"""
Private. Base class for all commands
:param trigger: A string trigger, which, when entered by the user, indicates that the input corresponds to the command
"""
self._trigger = trigger
def get_trigger(self) -> str:
"""
Public. Returns the trigger of the command
:return: the trigger of the command as str
"""
return self._trigger
class Command(BaseCommand):
def __init__(self, trigger: str,
description: str = None,
flags: Flag | Flags = None,
aliases: list[str] = None):
"""
Public. The command that can and should be registered in the Router
:param trigger: A string trigger, which, when entered by the user, indicates that the input corresponds to the command
:param description: the description of the command
:param flags: processed commands
:param aliases: string synonyms for the main trigger
"""
super().__init__(trigger)
self._registered_flags: Flags = flags if isinstance(flags, Flags) else Flags(flags) if isinstance(flags, Flag) else Flags()
self._description = f'Description for "{self._trigger}" command' if not description else description
self._aliases = aliases if isinstance(aliases, list) else []
def get_registered_flags(self) -> Flags:
"""
Private. Returns the registered flags
:return: the registered flags as Flags
"""
return self._registered_flags
def get_aliases(self) -> list[str] | list:
"""
Public. Returns the aliases of the command
:return: the aliases of the command as list[str] | list
"""
return self._aliases
def validate_input_flag(self, flag: InputFlag) -> bool:
"""
Private. Validates the input flag
:param flag: input flag for validation
:return: is input flag valid as bool
"""
registered_flags: Flags | None = self.get_registered_flags()
if registered_flags:
if isinstance(registered_flags, Flag):
if registered_flags.get_string_entity() == flag.get_string_entity():
is_valid = registered_flags.validate_input_flag_value(flag.get_value())
if is_valid:
return True
else:
for registered_flag in registered_flags:
if registered_flag.get_string_entity() == flag.get_string_entity():
is_valid = registered_flag.validate_input_flag_value(flag.get_value())
if is_valid:
return True
return False
def get_description(self) -> str:
"""
Private. Returns the description of the command
:return: the description of the command as str
"""
return self._description
class InputCommand(BaseCommand, Generic[InputCommandType]):
def __init__(self, trigger: str,
input_flags: InputFlag | InputFlags = None):
"""
Private. The model of the input command, after parsing
:param trigger:the trigger of the command
:param input_flags: the input flags
:return: None
"""
super().__init__(trigger)
self._input_flags: InputFlags = input_flags if isinstance(input_flags, InputFlags) else InputFlags(input_flags) if isinstance(input_flags, InputFlag) else InputFlags()
def _set_input_flags(self, input_flags: InputFlags) -> None:
"""
Private. Sets the input flags
:param input_flags: the input flags to set
:return: None
"""
self._input_flags = input_flags
def get_input_flags(self) -> InputFlags:
"""
Private. Returns the input flags
:return: the input flags as InputFlags
"""
return self._input_flags
@staticmethod
def parse(raw_command: str) -> InputCommandType:
"""
Private. Parse the raw input command
:param raw_command: raw input command
:return: model of the input command, after parsing as InputCommand
"""
if not raw_command:
raise EmptyInputCommandException()
list_of_tokens = raw_command.split()
command = list_of_tokens.pop(0)
input_flags: InputFlags = InputFlags()
current_flag_name, current_flag_value = None, None
for k, _ in enumerate(list_of_tokens):
if _.startswith('-'):
if current_flag_name or len(_) < 2 or len(_[:_.rfind('-')]) > 3:
raise UnprocessedInputFlagException()
current_flag_name = _
else:
if not current_flag_name:
raise UnprocessedInputFlagException()
current_flag_value = _
if current_flag_name:
if not len(list_of_tokens) == k+1:
if not list_of_tokens[k+1].startswith('-'):
continue
input_flag = InputFlag(name=current_flag_name[current_flag_name.rfind('-')+1:],
prefix=cast(Literal['-', '--', '---'],
current_flag_name[:current_flag_name.rfind('-')+1]),
value=current_flag_value)
all_flags = [flag.get_string_entity() for flag in input_flags.get_flags()]
if input_flag.get_string_entity() not in all_flags:
input_flags.add_flag(input_flag)
else:
raise RepeatedInputFlagsException(input_flag)
current_flag_name, current_flag_value = None, None
if any([current_flag_name, current_flag_value]):
raise UnprocessedInputFlagException()
else:
return InputCommand(trigger=command, input_flags=input_flags)
+4
View File
@@ -0,0 +1,4 @@
__all__ = ["Orchestrator"]
from argenta.orchestrator.entity import Orchestrator
@@ -0,0 +1,4 @@
__all__ = ["ArgParser"]
from argenta.orchestrator.argparser.entity import ArgParser
@@ -0,0 +1,6 @@
__all__ = ["BooleanArgument", "PositionalArgument", "OptionalArgument"]
from argenta.orchestrator.argparser.arguments.models import (BooleanArgument,
PositionalArgument,
OptionalArgument)
@@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from typing import Literal
class BaseArgument(ABC):
"""
Private. Base class for all arguments
"""
@abstractmethod
def get_string_entity(self) -> str:
"""
Public. Returns the string representation of the argument
:return: the string representation as a str
"""
pass
class PositionalArgument(BaseArgument):
def __init__(self, name: str):
"""
Public. Required argument at startup
:param name: name of the argument, must not start with minus (-)
"""
self.name = name
def get_string_entity(self):
return self.name
class OptionalArgument(BaseArgument):
def __init__(self, name: str, prefix: Literal['-', '--', '---'] = '--'):
"""
Public. Optional argument, must have the value
:param name: name of the argument
:param prefix: prefix of the argument
"""
self.name = name
self.prefix = prefix
def get_string_entity(self):
return self.prefix + self.name
class BooleanArgument(BaseArgument):
def __init__(self, name: str, prefix: Literal['-', '--', '---'] = '--'):
"""
Public. Boolean argument, does not require a value
:param name: name of the argument
:param prefix: prefix of the argument
"""
self.name = name
self.prefix = prefix
def get_string_entity(self):
return self.prefix + self.name
@@ -0,0 +1,49 @@
from argparse import ArgumentParser
from argenta.orchestrator.argparser.arguments.models import (BooleanArgument,
OptionalArgument,
PositionalArgument)
class ArgParser:
def __init__(self,
processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument],
name: str = 'Argenta',
description: str = 'Argenta available arguments',
epilog: str = 'github.com/koloideal/Argenta | made by kolo') -> None:
"""
Public. Cmd argument parser and configurator at startup
:param name: the name of the ArgParse instance
:param description: the description of the ArgParse instance
:param epilog: the epilog of the ArgParse instance
:param processed_args: registered and processed arguments
"""
self.name = name
self.description = description
self.epilog = epilog
self.entity: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
self.args: list[PositionalArgument | OptionalArgument | BooleanArgument] | None = processed_args
def set_args(self, *args: PositionalArgument | OptionalArgument | BooleanArgument) -> None:
"""
Public. Sets the arguments to be processed
:param args: processed arguments
:return: None
"""
self.args.extend(args)
def register_args(self) -> None:
"""
Private. Registers initialized command line arguments
:return: None
"""
if not self.args:
return
for arg in self.args:
if type(arg) is PositionalArgument:
self.entity.add_argument(arg.get_string_entity())
elif type(arg) is OptionalArgument:
self.entity.add_argument(arg.get_string_entity())
elif type(arg) is BooleanArgument:
self.entity.add_argument(arg.get_string_entity(), action='store_true')
+36
View File
@@ -0,0 +1,36 @@
from argparse import Namespace
from argenta.app import App
from argenta.orchestrator.argparser import ArgParser
class Orchestrator:
def __init__(self, arg_parser: ArgParser = False):
"""
Public. An orchestrator and configurator that defines the behavior of an integrated system, one level higher than the App
:param arg_parser: Cmd argument parser and configurator at startup
:return: None
"""
self.arg_parser: ArgParser | False = arg_parser
if arg_parser:
self.arg_parser.register_args()
@staticmethod
def start_polling(app: App) -> None:
"""
Public. Starting the user input processing cycle
:param app: a running application
:return: None
"""
app.run_polling()
def get_input_args(self) -> Namespace | None:
"""
Public. Returns the arguments parsed
:return: None
"""
if self.arg_parser:
return self.arg_parser.entity.parse_args()
else:
return None
+4
View File
@@ -0,0 +1,4 @@
__all__ = ["Router"]
from argenta.router.entity import Router
@@ -0,0 +1,79 @@
from typing import Callable
from argenta.command import Command
from argenta.command.flag import InputFlags
class CommandHandler:
def __init__(self, handler: Callable[[], None] | Callable[[InputFlags], None], handled_command: Command):
"""
Private. Entity of the model linking the handler and the command being processed
:param handler: the handler being called
:param handled_command: the command being processed
"""
self._handler = handler
self._handled_command = handled_command
def handling(self, input_flags: InputFlags = None) -> None:
"""
Private. Direct processing of an input command
:param input_flags: the flags of the input command
:return: None
"""
if input_flags is not None:
self._handler(input_flags)
else:
self._handler()
def get_handler(self) -> Callable[[], None] | Callable[[InputFlags], None]:
"""
Private. Returns the handler being called
:return: the handler being called as Callable[[], None] or Callable[[InputFlags], None]
"""
return self._handler
def get_handled_command(self) -> Command:
"""
Private. Returns the command being processed
:return: the command being processed as Command
"""
return self._handled_command
class CommandHandlers:
def __init__(self, command_handlers: list[CommandHandler] = None):
"""
Private. The model that unites all CommandHandler of the routers
:param command_handlers: list of CommandHandlers for register
"""
self.command_handlers = command_handlers if command_handlers else []
def get_command_handlers(self) -> list[CommandHandler]:
"""
Private. Returns the list of CommandHandlers
:return: the list of CommandHandlers as list[CommandHandler]
"""
return self.command_handlers
def add_command_handler(self, command_handler: CommandHandler) -> None:
"""
Private. Adds a CommandHandler to the list of CommandHandlers
:param command_handler: CommandHandler to be added
:return: None
"""
self.command_handlers.append(command_handler)
def add_command_handlers(self, *command_handlers: CommandHandler) -> None:
"""
Private. Extend a many CommandHandler to the list of CommandHandlers
:param command_handlers: many CommandHandler to be added
:return: None
"""
self.command_handlers.extend(command_handlers)
def __iter__(self):
return iter(self.command_handlers)
def __next__(self):
return next(iter(self.command_handlers))
+4
View File
@@ -0,0 +1,4 @@
from argenta.router import Router
system_router = Router(title='System points:')
+209
View File
@@ -0,0 +1,209 @@
from typing import Callable
from inspect import getfullargspec
from src.argenta.command import Command
from src.argenta.command.models import InputCommand
from src.argenta.router.command_handler.entity import CommandHandlers, CommandHandler
from src.argenta.command.flag.models import Flag, Flags, InputFlags
from src.argenta.router.exceptions import (RepeatedFlagNameException,
TooManyTransferredArgsException,
RequiredArgumentNotPassedException,
IncorrectNumberOfHandlerArgsException,
TriggerContainSpacesException)
class Router:
def __init__(self,
title: str = None):
"""
Public. Directly configures and manages handlers
:param title: the title of the router, displayed when displaying the available commands
:return: None
"""
self._title = title
self._command_handlers: CommandHandlers = CommandHandlers()
self._ignore_command_register: bool = False
self._not_valid_flag_handler: Callable[[Flag], None] = lambda flag: print(f"Undefined or incorrect input flag: {flag.get_string_entity()}{(' '+flag.get_value()) if flag.get_value() else ''}")
def command(self, command: Command) -> Callable:
"""
Public. Registers handler
:param command: Registered command
:return: decorated handler as Callable[[Any], Any]
"""
self._validate_command(command)
def command_decorator(func):
Router._validate_func_args(command, func)
self._command_handlers.add_command_handler(CommandHandler(func, command))
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return command_decorator
def set_invalid_input_flag_handler(self, func) -> None:
"""
Public. Registers handler for invalid input flag
:param func: registered handler
:return: None
"""
processed_args = getfullargspec(func).args
if len(processed_args) != 1:
raise IncorrectNumberOfHandlerArgsException()
else:
self._not_valid_flag_handler = func
def finds_appropriate_handler(self, input_command: InputCommand) -> None:
"""
Private. Finds the appropriate handler for given input command and passes control to it
:param input_command: input command as InputCommand
:return: None
"""
input_command_name: str = input_command.get_trigger()
input_command_flags: InputFlags = input_command.get_input_flags()
for command_handler in self._command_handlers:
handle_command = command_handler.get_handled_command()
if input_command_name.lower() == handle_command.get_trigger().lower():
self.process_input_command(input_command_flags, command_handler)
if input_command_name.lower() in handle_command.get_aliases():
self.process_input_command(input_command_flags, command_handler)
def process_input_command(self, input_command_flags: InputFlags, command_handler: CommandHandler) -> None:
"""
Private. Processes input command with the appropriate handler
:param input_command_flags: input command flags as InputFlags
:param command_handler: command handler for input command as CommandHandler
:return: None
"""
handle_command = command_handler.get_handled_command()
if handle_command.get_registered_flags().get_flags():
if input_command_flags.get_flags():
if self._validate_input_flags(handle_command, input_command_flags):
command_handler.handling(input_command_flags)
return
else:
command_handler.handling(input_command_flags)
return
else:
if input_command_flags.get_flags():
self._not_valid_flag_handler(input_command_flags[0])
return
else:
command_handler.handling()
return
def _validate_input_flags(self, handled_command: Command, input_flags: InputFlags) -> bool:
"""
Private. Validates flags of input command
:param handled_command: entity of the handled command
:param input_flags:
:return: is flags of input command valid as bool
"""
for flag in input_flags:
is_valid: bool = handled_command.validate_input_flag(flag)
if not is_valid:
self._not_valid_flag_handler(flag)
return False
return True
@staticmethod
def _validate_command(command: Command) -> None:
"""
Private. Validates the command registered in handler
:param command: validated command
:return: None if command is valid else raise exception
"""
command_name: str = command.get_trigger()
if command_name.find(' ') != -1:
raise TriggerContainSpacesException()
flags: Flags = command.get_registered_flags()
if flags:
flags_name: list = [x.get_string_entity().lower() for x in flags]
if len(set(flags_name)) < len(flags_name):
raise RepeatedFlagNameException()
@staticmethod
def _validate_func_args(command: Command, func: Callable) -> None:
"""
Private. Validates the arguments of the handler
:param command: registered command in handler
:param func: entity of the handler func
:return: None if func is valid else raise exception
"""
registered_args = command.get_registered_flags()
transferred_args = getfullargspec(func).args
if registered_args.get_flags() and transferred_args:
if len(transferred_args) != 1:
raise TooManyTransferredArgsException()
elif registered_args.get_flags() and not transferred_args:
raise RequiredArgumentNotPassedException()
elif not registered_args.get_flags() and transferred_args:
raise TooManyTransferredArgsException()
def set_command_register_ignore(self, _: bool) -> None:
"""
Private. Sets the router behavior on the input commands register
:param _: is command register ignore
:return: None
"""
self._ignore_command_register = _
def get_triggers(self) -> list[str]:
"""
Public. Gets registered triggers
:return: registered in router triggers as list[str]
"""
all_triggers: list[str] = []
for command_handler in self._command_handlers:
all_triggers.append(command_handler.get_handled_command().get_trigger())
return all_triggers
def get_aliases(self) -> list[str]:
"""
Public. Gets registered aliases
:return: registered in router aliases as list[str]
"""
all_aliases: list[str] = []
for command_handler in self._command_handlers:
if command_handler.get_handled_command().get_aliases():
all_aliases.extend(command_handler.get_handled_command().get_aliases())
return all_aliases
def get_command_handlers(self) -> CommandHandlers:
"""
Private. Gets registered command handlers
:return: registered command handlers as CommandHandlers
"""
return self._command_handlers
def get_title(self) -> str | None:
"""
Public. Gets title of the router
:return: the title of the router as str or None
"""
return self._title
def set_title(self, title: str) -> None:
"""
Public. Sets the title of the router
:param title: title that will be setted
:return: None
"""
self._title = title
+38
View File
@@ -0,0 +1,38 @@
class RepeatedFlagNameException(Exception):
"""
Private. Raised when a repeated flag name is registered
"""
def __str__(self):
return "Repeated registered_flag name in register command"
class TooManyTransferredArgsException(Exception):
"""
Private. Raised when too many arguments are passed
"""
def __str__(self):
return "Too many transferred arguments"
class RequiredArgumentNotPassedException(Exception):
"""
Private. Raised when a required argument is not passed
"""
def __str__(self):
return "Required argument not passed"
class IncorrectNumberOfHandlerArgsException(Exception):
"""
Private. Raised when incorrect number of arguments are passed
"""
def __str__(self):
return "Handler has incorrect number of arguments"
class TriggerContainSpacesException(Exception):
"""
Private. Raised when there is a space in the trigger being registered
"""
def __str__(self):
return "Command trigger cannot contain spaces"