This commit is contained in:
2025-04-14 14:54:17 +03:00
parent a5fdcab862
commit 3ef8707cfa
11 changed files with 185 additions and 89 deletions
+32 -7
View File
@@ -3,13 +3,25 @@ import readline
class AutoCompleter:
def __init__(self, history_filename: str = False, autocomplete_button: str = 'tab'):
def __init__(self, history_filename: str = False, autocomplete_button: str = 'tab') -> None:
"""
Public. Configures and implements auto-completion of input command
:param history_filename: the name of the file for saving the history of the autocompleter
:param autocomplete_button: the button for auto-completion
:return: None
"""
self.history_filename = history_filename
self.autocomplete_button = autocomplete_button
self.matches = []
self.matches: list[str] = []
def complete(self, text, state):
matches = sorted(cmd for cmd in self.get_history_items() if cmd.startswith(text))
def complete(self, text, state) -> str | None:
"""
Private. Auto-completion function
:param text: part of the command being entered
:param state: the current cursor position is relative to the beginning of the line
:return: the desired candidate as str or None
"""
matches: list[str] = sorted(cmd for cmd in self.get_history_items() if cmd.startswith(text))
if len(matches) > 1:
common_prefix = matches[0]
for match in matches[1:]:
@@ -26,7 +38,12 @@ class AutoCompleter:
else:
return None
def initial_setup(self, all_commands: list[str]):
def initial_setup(self, all_commands: list[str]) -> None:
"""
Public. Initial setup function
:param all_commands: Registered commands for adding them to the autocomplete history
:return: None
"""
if self.history_filename:
if os.path.exists(self.history_filename):
readline.read_history_file(self.history_filename)
@@ -38,10 +55,18 @@ class AutoCompleter:
readline.set_completer_delims(readline.get_completer_delims().replace(' ', ''))
readline.parse_and_bind(f'{self.autocomplete_button}: complete')
def exit_setup(self):
def exit_setup(self) -> None:
"""
Public. Exit setup function
:return: None
"""
if self.history_filename:
readline.write_history_file(self.history_filename)
@staticmethod
def get_history_items():
def get_history_items() -> list[str] | list:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str]
"""
return [readline.get_history_item(i) for i in range(1, readline.get_current_history_length() + 1)]
+4 -1
View File
@@ -2,7 +2,10 @@ from dataclasses import dataclass
@dataclass
class PredeterminedMessages:
class PredefinedMessages:
"""
A dataclass with predetermined messages for quick use
"""
USAGE = '[b dim]Usage[/b dim]: [i]<command> <[green]flags[/green]>[/i]'
HELP = '[b dim]Help[/b dim]: [i]<command>[/i] [b red]--help[/b red]'
AUTOCOMPLETE = '[b dim]Autocomplete[/b dim]: [i]<part>[/i] [bold]<tab>'
+55 -11
View File
@@ -1,24 +1,68 @@
class BaseDividingLine:
def __init__(self, unit_part: str = '-'):
self.unit_part = unit_part
from abc import ABC
def get_unit_part(self):
if len(self.unit_part) == 0:
class BaseDividingLine(ABC):
def __init__(self, unit_part: str = '-') -> None:
"""
Private. The basic dividing line
:param unit_part: the single part of the dividing line
:return: None
"""
self._unit_part = unit_part
def get_unit_part(self) -> str:
"""
Private. Returns the unit part of the dividing line
:return: unit_part of dividing line as str
"""
if len(self._unit_part) == 0:
return ' '
else:
return self.unit_part[0]
return self._unit_part[0]
class StaticDividingLine(BaseDividingLine):
def __init__(self, unit_part: str = '-', length: int = 25):
def __init__(self, unit_part: str = '-', length: int = 25) -> None:
"""
Public. The static dividing line
:param unit_part: the single part of the dividing line
:param length: the length of the dividing line
:return: None
"""
super().__init__(unit_part)
self.length = length
def get_full_line(self):
return f'\n[dim]{self.length * self.get_unit_part()}[/dim]\n'
def get_full_static_line(self, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param is_override: has the default text layout been redefined
:return: full line of dividing line as str
"""
if is_override:
return f'\n{self.length * self.get_unit_part()}\n'
else:
return f'\n[dim]{self.length * self.get_unit_part()}[/dim]\n'
class DynamicDividingLine(BaseDividingLine):
def get_full_line(self, length: int):
return f'\n[dim]{self.get_unit_part() * length}[/dim]\n'
def __init__(self, unit_part: str = '-') -> None:
"""
Public. The dynamic dividing line
:param unit_part: the single part of the dividing line
:return: None
"""
super().__init__(unit_part)
def get_full_dynamic_line(self, length: int, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param length: the length of the dividing line
:param is_override: has the default text layout been redefined
:return: full line of dividing line as str
"""
if is_override:
return f'\n{length * self.get_unit_part()}\n'
else:
return f'\n[dim]{self.get_unit_part() * length}[/dim]\n'
+4 -6
View File
@@ -1,10 +1,8 @@
class NoRegisteredRoutersException(Exception):
def __str__(self):
return "No Registered Router Found"
class NoRegisteredHandlersException(Exception):
def __init__(self, router_name):
"""
The router has no registered handlers
"""
def __init__(self, router_name) -> None:
self.router_name = router_name
def __str__(self):
return f"No Registered Handlers Found For '{self.router_name}'"
+36 -37
View File
@@ -15,8 +15,7 @@ from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException,
BaseInputCommandException)
from argenta.app.exceptions import (NoRegisteredRoutersException,
NoRegisteredHandlersException)
from argenta.app.exceptions import NoRegisteredHandlersException
from argenta.app.registered_routers.entity import RegisteredRouters
@@ -48,17 +47,18 @@ class AppInit:
self._initial_message = initial_message
self._description_message_gen: Callable[[str, str], str] = lambda command, description: f'[bold red]{escape('['+command+']')}[/bold red] [blue dim]*=*=*[/blue dim] [bold yellow italic]{escape(description)}'
self._description_message_gen: Callable[[str, str], str] = lambda command, description: f'[{command}] *=*=* {description}'
self._registered_routers: RegisteredRouters = RegisteredRouters()
self._messages_on_startup = []
self._invalid_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'[red bold]Incorrect flag syntax: {escape(raw_command)}')
self._repeated_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'[red bold]Repeated input flags: {escape(raw_command)}')
self._empty_input_command_handler: Callable[[], None] = lambda: print_func('[red bold]Empty input command')
self._unknown_command_handler: Callable[[InputCommand], None] = lambda command: print_func(f"[red bold]Unknown command: {escape(command.get_trigger())}")
self._invalid_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Incorrect flag syntax: {raw_command}')
self._repeated_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Repeated input flags: {raw_command}')
self._empty_input_command_handler: Callable[[], None] = lambda: print_func('Empty input command')
self._unknown_command_handler: Callable[[InputCommand], None] = lambda command: print_func(f"Unknown command: {command.get_trigger()}")
self._exit_command_handler: Callable[[], None] = lambda: print_func(self._farewell_message)
class AppSetters(AppInit):
def set_description_message_pattern(self, pattern: Callable[[str, str], str]) -> None:
self._description_message_gen: Callable[[str, str], str] = pattern
@@ -84,6 +84,7 @@ class AppSetters(AppInit):
self._exit_command_handler = handler
class AppPrinters(AppInit):
def _print_command_group_description(self):
for registered_router in self._registered_routers:
@@ -96,22 +97,21 @@ class AppPrinters(AppInit):
self._print_func('')
def _print_framed_text_with_dynamic_line(self, text: str):
clear_text = re.sub(r'\u001b\[[0-9;]*m', '', text)
max_length_line = max([len(line) for line in clear_text.split('\n')])
max_length_line = max_length_line if 10 <= max_length_line <= 80 else 80 if max_length_line > 80 else 10
self._print_func(self._dividing_line.get_full_line(max_length_line))
print(text.strip('\n'))
self._print_func(self._dividing_line.get_full_line(max_length_line))
def _print_framed_text(self, text: str):
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(self._dividing_line.get_full_line())
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
self._print_func(text)
self._print_func(self._dividing_line.get_full_line())
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
elif isinstance(self._dividing_line, DynamicDividingLine):
self._print_framed_text_with_dynamic_line(text)
clear_text = re.sub(r'\u001b\[[0-9;]*m', '', text)
max_length_line = max([len(line) for line in clear_text.split('\n')])
max_length_line = max_length_line if 10 <= max_length_line <= 80 else 80 if max_length_line > 80 else 10
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
print(text.strip('\n'))
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
class AppNonStandardHandlers(AppPrinters):
@@ -140,15 +140,10 @@ class AppNonStandardHandlers(AppPrinters):
return False
elif command.get_trigger() in handled_command_trigger:
return False
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(self._dividing_line.get_full_line())
with redirect_stdout(io.StringIO()) as f:
self._unknown_command_handler(command)
self._print_func(self._dividing_line.get_full_line())
elif isinstance(self._dividing_line, DynamicDividingLine):
with redirect_stdout(io.StringIO()) as f:
self._unknown_command_handler(command)
res: str = f.getvalue()
self._print_framed_text_with_dynamic_line(res)
res: str = f.getvalue()
self._print_framed_text(res)
return True
@@ -162,18 +157,15 @@ class AppNonStandardHandlers(AppPrinters):
self._empty_input_command_handler()
class AppValidators(AppInit):
def _validate_number_of_routers(self) -> None:
if not self._registered_routers:
raise NoRegisteredRoutersException()
def _validate_included_routers(self) -> None:
for router in self._registered_routers:
if not router.get_command_handlers():
raise NoRegisteredHandlersException(router.get_name())
class AppSetups(AppValidators, AppPrinters):
def _setup_system_router(self):
system_router.set_title(self._system_points_title)
@@ -189,14 +181,19 @@ class AppSetups(AppValidators, AppPrinters):
def _setup_default_view(self):
if not self._override_system_messages:
self._initial_message = f'\n[bold red]{text2art(self._initial_message, font='tarty1')}\n\n'
self._farewell_message = (
f'[bold red]\n{text2art(f'\n{self._farewell_message}\n', font='chanky')}[/bold red]\n'
f'[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n')
self._farewell_message = (f'[bold red]\n{text2art(f'\n{self._farewell_message}\n', font='chanky')}[/bold red]\n'
f'[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n')
self._description_message_gen = lambda command, description: (f'[bold red]{escape('[' + command + ']')}[/bold red] '
f'[blue dim]*=*=*[/blue dim] '
f'[bold yellow italic]{escape(description)}')
self._invalid_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Incorrect flag syntax: {escape(raw_command)}')
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Repeated input flags: {escape(raw_command)}')
self._empty_input_command_handler = lambda: self._print_func('[red bold]Empty input command')
self._unknown_command_handler = lambda command: self._print_func(f"[red bold]Unknown command: {escape(command.get_trigger())}")
def _pre_cycle_setup(self):
self._setup_default_view()
self._setup_system_router()
self._validate_number_of_routers()
self._validate_included_routers()
all_triggers: list[str] = []
@@ -209,12 +206,14 @@ class AppSetups(AppValidators, AppPrinters):
for message in self._messages_on_startup:
self._print_func(message)
print('\n\n')
if self._messages_on_startup:
print('\n\n')
if not self._repeat_command_groups_description:
self._print_command_group_description()
class App(AppSetters, AppNonStandardHandlers, AppSetups):
def run_polling(self) -> None:
self._pre_cycle_setup()
+21 -2
View File
@@ -3,15 +3,34 @@ from argenta.router import Router
class RegisteredRouters:
def __init__(self, registered_routers: list[Router] = None) -> None:
"""
Private. Combines registered routers
:param registered_routers: list of the registered routers
:return: None
"""
self._registered_routers = registered_routers if registered_routers else []
def get_registered_routers(self) -> list[Router]:
"""
Private. Returns the registered routers
:return: registered routers as list[Router]
"""
return self._registered_routers
def add_registered_router(self, router: Router):
def add_registered_router(self, router: Router) -> None:
"""
Private. Adds a new registered router
:param router: registered router
:return: None
"""
self._registered_routers.append(router)
def add_registered_routers(self, *routers: Router):
def add_registered_routers(self, *routers: Router) -> None:
"""
Private. Adds new registered routers
:param routers: registered routers
:return: None
"""
self._registered_routers.extend(routers)
def __iter__(self):
+9 -6
View File
@@ -6,23 +6,24 @@ from argenta.orchestrator.argparse.arguments.models import (BooleanArgument,
class ArgParse:
def __init__(self, name: str = 'Argenta',
def __init__(self,
processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument],
name: str = 'Argenta',
description: str = 'Argenta available arguments',
epilog: str = 'github.com/koloideal/Argenta | made by kolo',
args: list[PositionalArgument | OptionalArgument | BooleanArgument] = None) -> None:
epilog: str = 'github.com/koloideal/Argenta | made by kolo') -> None:
"""
Cmd argument parser and configurator at startup
:param name: the name of the ArgParse instance
:param description: the description of the ArgParse instance
:param epilog: the epilog of the ArgParse instance
:param args: registered and processed arguments
:param processed_args: registered and processed arguments
"""
self.name = name
self.description = description
self.epilog = epilog
self.entity = ArgumentParser(prog=name, description=description, epilog=epilog)
self.args: list[PositionalArgument | OptionalArgument | BooleanArgument] | None = args
self.entity: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
self.args: list[PositionalArgument | OptionalArgument | BooleanArgument] | None = processed_args
def set_args(self, *args: PositionalArgument | OptionalArgument | BooleanArgument):
"""
@@ -37,6 +38,8 @@ class ArgParse:
Registers initialized command line arguments
:return:
"""
if not self.args:
return
for arg in self.args:
if type(arg) is PositionalArgument:
self.entity.add_argument(arg.get_string_entity())
+11 -5
View File
@@ -1,15 +1,18 @@
from argparse import Namespace
from argenta.app import App
from argenta.orchestrator.argparse import ArgParse
class Orchestrator:
def __init__(self, arg_parser: ArgParse):
def __init__(self, arg_parser: ArgParse = False):
"""
An orchestrator and configurator that defines the behavior of an integrated system, one level higher than the App
:param arg_parser: Cmd argument parser and configurator at startup
"""
self.arg_parser: ArgParse = arg_parser
self.arg_parser.register_args()
self.arg_parser: ArgParse | False = arg_parser
if arg_parser:
self.arg_parser.register_args()
@staticmethod
def start_polling(app: App) -> None:
@@ -20,10 +23,13 @@ class Orchestrator:
"""
app.run_polling()
def get_args(self):
def get_input_args(self) -> Namespace | None:
"""
Returns the arguments parsed
:return:
"""
return self.arg_parser.entity.parse_args()
if self.arg_parser:
return self.arg_parser.entity.parse_args()
else:
return None
+4 -5
View File
@@ -1,9 +1,8 @@
from argenta.app import App
from argenta.app.defaults import PredeterminedMessages
from argenta.app.defaults import PredefinedMessages
from argenta.orchestrator import Orchestrator
app = App(repeat_command_groups=True)
app.add_message_on_startup(PredeterminedMessages.USAGE + '\n\n')
app.run_polling()
orchestrator = Orchestrator()
orchestrator.start_polling(app)
+7 -7
View File
@@ -1,7 +1,7 @@
from mock.mock_app.handlers.routers import work_router, settings_router
from argenta.app import App
from argenta.app.defaults import PredeterminedMessages
from argenta.app.defaults import PredefinedMessages
from argenta.app.dividing_line import DynamicDividingLine
from argenta.app.autocompleter import AutoCompleter
from argenta.orchestrator import Orchestrator
@@ -11,7 +11,7 @@ from argenta.orchestrator.argparse.arguments import (PositionalArgument,
BooleanArgument)
arg_parser = ArgParse(args=[PositionalArgument('test'), OptionalArgument('some'), BooleanArgument('verbose')])
arg_parser = ArgParse(processed_args=[BooleanArgument('repeat')])
app: App = App(dividing_line=DynamicDividingLine(),
autocompleter=AutoCompleter('./mock/.hist'))
orchestrator: Orchestrator = Orchestrator(arg_parser)
@@ -20,12 +20,12 @@ orchestrator: Orchestrator = Orchestrator(arg_parser)
def main():
app.include_routers(work_router, settings_router)
app.add_message_on_startup(PredeterminedMessages.USAGE)
app.add_message_on_startup(PredeterminedMessages.AUTOCOMPLETE)
app.add_message_on_startup(PredeterminedMessages.HELP)
app.add_message_on_startup(PredefinedMessages.USAGE)
app.add_message_on_startup(PredefinedMessages.AUTOCOMPLETE)
app.add_message_on_startup(PredefinedMessages.HELP)
print(orchestrator.get_args())
#orchestrator.start_polling(app)
print(orchestrator.get_input_args())
orchestrator.start_polling(app)
if __name__ == "__main__":
main()
+2 -2
View File
@@ -6,11 +6,11 @@ import unittest
class TestDividingLine(unittest.TestCase):
def test_get_static_dividing_line_full_line(self):
line = StaticDividingLine('-')
self.assertEqual(line.get_full_line().count('-'), 25)
self.assertEqual(line.get_full_static_line(True).count('-'), 25)
def test_get_dynamic_dividing_line_full_line(self):
line = DynamicDividingLine()
self.assertEqual(line.get_full_line(20).count('-'), 20)
self.assertEqual(line.get_full_dynamic_line(20, True).count('-'), 20)
def test_get_dividing_line_unit_part(self):
line = StaticDividingLine('')