57 Commits

Author SHA1 Message Date
kolo 77416cf22c release 1.1.1 2025-10-08 20:49:29 +03:00
kolo b6c84f1a91 fix ci 2025-10-08 13:44:12 +03:00
kolo c2d235e576 fix ci 2025-10-08 13:42:01 +03:00
kolo f7f5db58aa fix ci 2025-10-08 13:40:20 +03:00
kolo 73303b1c08 ref: typehints, enum instead of raw string, abc and other (#1)
Full code coverage with annotations, fixing errors in various linters: ruff, wps, etc. Fixing errors in type checkers: ty, mypy, pyright. Formatting and bringing code to a consistent style, applying best practices in various aspects.
2025-10-08 13:37:31 +03:00
kolo 22f1171192 fix: link to img in readme 2025-08-12 12:26:37 +03:00
kolo a844095fdc Update README.de.md 2025-08-06 14:57:31 +03:00
kolo a7c6a14705 Update README.md 2025-08-06 14:56:24 +03:00
kolo cfdb37330e Update README.ru.md 2025-08-06 14:52:33 +03:00
kolo aef6a9ca38 update .gitignore 2025-07-04 18:31:52 +03:00
kolo c8e0729be8 fix bugs 2025-05-27 14:19:54 +03:00
kolo c2bbc5f15d first steps sor adding metrics tests 2025-05-24 00:39:39 +03:00
kolo 0acbf54e44 first steps sor adding metrics tests 2025-05-23 22:12:12 +03:00
kolo c3d9541330 working 2025-05-23 14:33:13 +03:00
kolo f6561de9b3 wotk 2025-05-22 20:26:48 +03:00
kolo bebd84969b add Enum PossibleValues for bool values as values of possible_values argument in Flag 2025-05-22 12:10:32 +03:00
kolo 365347ea7f some fix 2025-05-21 17:01:35 +03:00
kolo 33cb528b1d some fix 2025-05-21 13:32:45 +03:00
kolo fd287c5da0 fix type hints with ty help, add ability to configure stdout capture when handling input by the router 2025-05-20 22:44:47 +03:00
kolo 45f410e3e8 make pre_cycle_setup faster on 4 sec, start implemtnation disable redirect stdout in router 2025-05-19 10:31:05 +03:00
kolo 8b06e9cd39 add metrics concept 2025-05-12 16:22:29 +03:00
kolo c38fe10006 translate readme on de 2025-05-10 22:07:52 +03:00
kolo 03cbc64f48 translate readme 2025-05-10 21:56:34 +03:00
kolo cbf7d3c578 new warning when triggers or aliases overlap 2025-05-10 21:31:59 +03:00
kolo ea2d068022 change dataclass to enum 2025-05-10 20:13:42 +03:00
kolo 5991851207 Update README.md 2025-05-10 00:29:03 +03:00
kolo f628c3b5b5 v1.0.2 2025-05-10 00:11:57 +03:00
kolo 05379712f4 some fix 2025-05-09 23:33:54 +03:00
kolo ed1cbf0fcf stable version 2025-05-09 23:27:34 +03:00
kolo 471f05369b Merge branch 'dev' of https://github.com/koloideal/Argenta into dev 2025-05-09 23:25:47 +03:00
kolo 13f7e33db1 ruff format 2025-05-09 23:25:21 +03:00
kolo 9a78aa9263 ruff check 2025-05-09 23:24:12 +03:00
kolo 58ccd6b26d Update README.md 2025-05-09 23:20:45 +03:00
kolo 73144f7ba4 Update README.md 2025-05-09 23:19:07 +03:00
kolo 650f4c9036 some fix 2025-05-09 18:23:08 +03:00
kolo 393f5c7d81 fix 2025-05-08 23:43:08 +03:00
kolo 9eb2bb6c46 new imgs 2025-05-08 23:37:42 +03:00
kolo 79b275eac7 some fix 2025-05-08 01:08:11 +03:00
kolo 07ac2af71e some fix 2025-05-07 19:14:19 +03:00
kolo c4b3aa7db8 working 2025-05-07 02:15:42 +03:00
kolo 61ef6a6466 all tests passed 2025-05-06 21:53:53 +03:00
kolo 477f3a7dec starting refactor tests 2025-05-04 16:40:10 +03:00
kolo adf3431388 more beautiful typehints warning 2025-05-04 03:08:54 +03:00
kolo 83955aa046 first beta - adding hints for similar commands, now - feature freezing 2025-05-04 02:13:05 +03:00
kolo 5a17e916eb work on stable major version 2025-04-30 15:48:38 +03:00
kolo 1159dda16e work on Response model 2025-04-30 00:08:49 +03:00
kolo 315508a36e work on Response 2025-04-29 20:40:47 +03:00
kolo 9d6598c4e0 work on Response model 2025-04-29 00:07:32 +03:00
kolo eb43806da6 new model - Response 2025-04-28 02:21:34 +03:00
kolo e076dbf84f new img in docs 2025-04-27 23:43:14 +03:00
kolo 2f090b6b47 new img in docs 2025-04-27 23:42:56 +03:00
kolo c9dbf2bbae fix print framed text with static dividing line 2025-04-27 23:27:08 +03:00
kolo e768c1bd2c fix 2025-04-27 21:29:14 +03:00
kolo 408450ec12 fix 2025-04-27 21:20:44 +03:00
kolo 106ca058be new tests 2025-04-27 14:11:01 +03:00
kolo b5ddfb3b35 new tests 2025-04-27 13:28:11 +03:00
kolo 61e4502e41 work, fix etc. 2025-04-26 22:23:35 +03:00
63 changed files with 1743 additions and 2509 deletions
+31
View File
@@ -0,0 +1,31 @@
name: mypy
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
permissions:
contents: read
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install mypy
pip install .
- name: Run type checker
run: mypy -p argenta
+3 -3
View File
@@ -2,9 +2,9 @@ name: ruff
on:
push:
branches: [ "kolo" ]
branches: [ "main" ]
pull_request:
branches: [ "kolo" ]
branches: [ "main" ]
permissions:
contents: read
@@ -27,4 +27,4 @@ jobs:
pip install ruff
- name: Run linter
run: ruff check ./argenta
run: ruff check ./src
+5 -5
View File
@@ -2,9 +2,9 @@ name: tests
on:
push:
branches: [ "kolo" ]
branches: [ "main" ]
pull_request:
branches: [ "kolo" ]
branches: [ "main" ]
permissions:
contents: read
@@ -24,8 +24,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
poetry install
pip install uv
uv sync --group dev
- name: Run tests
run: poetry run python -m unittest discover
run: uv run python -m unittest discover
+3 -2
View File
@@ -1,9 +1,10 @@
.venv
*venv
.idea
.vscode
dist
uv.lock
*__pycache__/
*.hist*
build
source
*cache
+69
View File
@@ -0,0 +1,69 @@
# Argenta
### Bibliothek zum Erstellen modularer CLI-Anwendungen
Mit Argenta können Sie die CLI-Funktionalität in isolierte, abstrahierte Umgebungen einkapseln. Zum Beispiel: Sie erstellen ein Dienstprogramm ähnlich dem Metasploit Framework, bei dem der Benutzer zuerst in einen bestimmten Scoop eintritt (z. B. ein Modul zum Scannen auswählt) und dann auf eine Reihe von Befehlen zugreift, die nur für diesen Kontext spezifisch sind. Argenta bietet eine einfache und prägnante Möglichkeit, eine solche Architektur zu konstruieren.
---
![preview](https://github.com/koloideal/Argenta/blob/main/imgs/mock_app_preview4.png?raw=True)
---
# Installation
```bash
pip install argenta
```
or
```bash
poetry add argenta
```
---
# Schnellstart
Ein Beispiel für eine einfache Anwendung
```python
# routers.py
from argenta.router import Router
from argenta.command import Command
from argenta.response import Response
router = Router()
@router.command(Command("hello"))
def handler(response: Response):
print("Hello, world!")
```
```python
# main.py
from argenta.app import App
from argenta.orchestrator import Orchestrator
from routers import router
app: App = App()
orchestrator: Orchestrator = Orchestrator()
def main() -> None:
app.include_router(router)
orchestrator.start_polling(app)
if __name__ == '__main__':
main()
```
---
# Funktionen in der Entwicklung
- Vollständige Unterstützung für Autocompleter unter Linux
## Vollständige [Dokumentation](https://argenta-docs.vercel.app) | MIT 2025 kolo | made by [kolo](https://t.me/kolo_id)
+12 -1364
View File
File diff suppressed because it is too large Load Diff
+69
View File
@@ -0,0 +1,69 @@
# Argenta
### Библиотека для создания модульных CLI приложeний
Argenta позволяет инкапсулировать CLI фукциональность в изолированные, абстрагированные **среды**. К примеру: вы создаете утилиту, подобную Metasploit Framework, где пользователь сначала входит в определенный скоуп (например, выбирает модуль для сканирования), а затем получает доступ к набору команд, специфичных только для этого контекста. Argenta предоставляет простой и лаконичный способ для построения такой архитектуры.
---
![preview](https://github.com/koloideal/Argenta/blob/main/imgs/mock_app_preview4.png?raw=True)
---
# Установка
```bash
pip install argenta
```
or
```bash
poetry add argenta
```
---
# Быстрый старт
Пример простейшего приложения
```python
# routers.py
from argenta.router import Router
from argenta.command import Command
from argenta.response import Response
router = Router()
@router.command(Command("hello"))
def handler(response: Response):
print("Hello, world!")
```
```python
# main.py
from argenta.app import App
from argenta.orchestrator import Orchestrator
from routers import router
app: App = App()
orchestrator: Orchestrator = Orchestrator()
def main() -> None:
app.include_router(router)
orchestrator.start_polling(app)
if __name__ == '__main__':
main()
```
---
# Фичи в разработке
- Полноценная поддержка автокомплитера на Linux
## Полная [документация](https://argenta-docs.vercel.app) | MIT 2025 kolo | made by [kolo](https://t.me/kolo_id)
Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 928 KiB

+46
View File
@@ -0,0 +1,46 @@
<?xml version="1.0" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 20010904//EN"
"http://www.w3.org/TR/2001/REC-SVG-20010904/DTD/svg10.dtd">
<svg version="1.0" xmlns="http://www.w3.org/2000/svg"
width="809.000000pt" height="809.000000pt" viewBox="0 0 809.000000 809.000000"
preserveAspectRatio="xMidYMid meet">
<g transform="translate(0.000000,809.000000) scale(0.100000,-0.100000)"
fill="#000000" stroke="none">
<path d="M3845 7600 c-683 -38 -1333 -259 -1875 -639 -702 -492 -1223 -1251
-1434 -2089 -83 -330 -111 -602 -103 -980 6 -273 19 -393 67 -632 167 -829
642 -1593 1321 -2122 512 -399 1133 -656 1794 -745 165 -22 666 -25 825 -5
779 99 1451 397 2020 898 687 603 1095 1401 1201 2344 17 147 17 571 0 715
-59 518 -204 977 -442 1402 -180 322 -355 552 -629 823 -518 515 -1164 850
-1887 979 -254 45 -594 66 -858 51z m655 -234 c545 -76 1005 -248 1445 -541
195 -130 336 -245 506 -415 457 -456 758 -987 909 -1605 134 -547 132 -1119
-5 -1665 -149 -593 -445 -1112 -885 -1550 -624 -623 -1414 -966 -2315 -1006
-412 -18 -935 73 -1339 232 -656 259 -1228 720 -1610 1299 -298 450 -461 890
-538 1451 -28 210 -31 620 -4 819 103 786 437 1477 975 2016 149 149 266 249
417 357 315 225 692 405 1059 506 211 58 342 81 675 120 87 10 601 -3 710 -18z"/>
<path d="M3691 6759 c-231 -17 -522 -67 -660 -114 -227 -77 -354 -211 -381
-400 -14 -105 -13 -628 2 -643 9 -9 158 -12 611 -12 598 0 628 -2 649 -34 14
-21 8 -66 -12 -86 -20 -20 -33 -20 -908 -20 -857 0 -892 -1 -967 -20 -294 -75
-500 -321 -599 -715 -49 -195 -61 -309 -60 -585 0 -221 3 -272 23 -385 44
-251 116 -418 232 -540 78 -82 143 -123 254 -162 78 -27 85 -27 351 -31 l272
-4 16 23 c14 20 16 58 16 259 0 334 20 446 107 613 80 153 268 310 447 374
128 45 138 46 881 52 683 7 713 8 786 28 201 56 353 177 437 348 80 162 86
240 74 905 -11 608 -10 600 -84 743 -80 153 -240 271 -453 332 -252 73 -653
101 -1034 74z m-383 -466 c126 -78 147 -245 44 -355 -144 -154 -396 -54 -395
157 0 97 62 187 154 222 54 20 143 10 197 -24z"/>
<path d="M5451 5491 c-20 -20 -21 -30 -21 -283 0 -276 -7 -351 -42 -458 -81
-251 -281 -454 -523 -534 -149 -48 -151 -48 -890 -56 -678 -6 -703 -7 -780
-28 -139 -38 -219 -84 -315 -181 -61 -61 -95 -105 -117 -151 -65 -134 -64
-129 -75 -780 -12 -665 -10 -694 47 -817 122 -265 419 -429 895 -494 127 -18
592 -18 730 -1 377 48 646 169 785 355 57 75 67 94 96 182 20 64 23 94 27 325
4 238 3 257 -15 278 l-18 23 -551 -1 c-399 0 -559 3 -577 11 -51 23 -56 102
-8 126 9 4 439 10 956 13 1044 7 981 2 1125 75 178 89 305 255 386 502 84 257
111 618 73 973 -39 364 -167 655 -351 801 -63 50 -121 80 -213 110 -64 20 -93
23 -337 27 -260 4 -267 3 -287 -17z m-2104 -1698 c78 -52 531 -413 561 -447
17 -19 33 -48 37 -65 15 -71 3 -84 -323 -356 -275 -229 -310 -255 -342 -255
-78 1 -112 75 -61 134 9 10 86 75 171 144 247 201 355 294 358 311 1 9 -15 28
-38 44 -127 89 -503 393 -512 413 -23 50 20 103 82 104 15 0 45 -12 67 -27z
m1388 -1244 c59 -15 129 -77 151 -134 36 -96 0 -204 -88 -262 -36 -24 -51 -28
-118 -28 -67 0 -82 4 -118 28 -153 101 -124 338 48 392 57 17 71 18 125 4z"/>
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

+33
View File
@@ -0,0 +1,33 @@
from argenta.command import Command
from argenta.metrics import get_time_of_pre_cycle_setup
from argenta.response import Response
from argenta.router import Router
from argenta.app import App
def commands_with_two_aliases(num_of_commands: int):
router = Router()
for i in range(num_of_commands):
@router.command(Command(f'cmd{i}', aliases=[f'cdr{i}', f'prt{i}']))
def handler(response: Response): # pyright: ignore[reportUnusedFunction, reportUnusedParameter]
pass
app = App()
app.include_router(router)
return get_time_of_pre_cycle_setup(app)
def commands_with_one_aliases(num_of_commands: int):
router = Router()
for i in range(num_of_commands):
@router.command(Command(f'cmd{i}', aliases=[f'cdr{i}']))
def handler(response: Response): # pyright: ignore[reportUnusedFunction, reportUnusedParameter]
pass
app = App()
app.include_router(router)
return get_time_of_pre_cycle_setup(app)
-7
View File
@@ -1,7 +0,0 @@
from argenta.app import App
from argenta.orchestrator import Orchestrator
app = App(repeat_command_groups=True)
orchestrator = Orchestrator()
orchestrator.start_polling(app)
+4 -12
View File
@@ -1,13 +1,5 @@
from argenta.command.flag.defaults import PredefinedFlags
import argparse
router = Router()
flag = PredefinedFlags.SHORT_HELP
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
print(f'help for {args.get_flag('h').get_name()} flag')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
parser = argparse.ArgumentParser(prog='myprogram')
_ = parser.add_argument('--foo', help='foo of the %(prog)s program')
parser.print_help()
@@ -1,10 +0,0 @@
from rich.console import Console
console = Console()
def help_command():
console.print("[italic bold]The main functionality of the script is to convert an expression from a string "
"to a mathematical one and then calculate this expression. "
"Project GitHub: https://github.com/koloideal/WordMath[/italic bold]")
-22
View File
@@ -1,22 +0,0 @@
from rich.console import Console
from argenta.command import Command
from argenta.router import Router
work_router: Router = Router(title='Work points:')
console = Console()
@work_router.command(Command('get', 'Get Help', aliases=['help', 'Get_help']))
def command_help():
pass
@work_router.command(Command('run', 'Run All'))
def command_start_solving():
pass
+12 -11
View File
@@ -1,22 +1,22 @@
from mock.mock_app.handlers.routers import work_router
from mock.mock_app.routers import work_router
from argenta.app import App
from argenta.app.defaults import PredefinedMessages
from argenta.app.dividing_line import DynamicDividingLine
from argenta.app.autocompleter import AutoCompleter
from argenta.orchestrator import Orchestrator
from argenta.orchestrator.argparser import ArgParser
from argenta.orchestrator.argparser.arguments import BooleanArgument
from argenta import App, Orchestrator
from argenta.app import PredefinedMessages, DynamicDividingLine, AutoCompleter
from argenta.orchestrator import ArgParser
from argenta.orchestrator.argparser import BooleanArgument
arg_parser = ArgParser(processed_args=[BooleanArgument('repeat')])
app: App = App(dividing_line=DynamicDividingLine(),
autocompleter=AutoCompleter('./mock/.hist'))
arg_parser = ArgParser(processed_args=[BooleanArgument("repeat")])
app: App = App(
dividing_line=DynamicDividingLine(),
autocompleter=AutoCompleter(),
)
orchestrator: Orchestrator = Orchestrator(arg_parser)
def main():
app.include_router(work_router)
print(f"\n\n{orchestrator.get_input_args()}")
app.add_message_on_startup(PredefinedMessages.USAGE)
app.add_message_on_startup(PredefinedMessages.AUTOCOMPLETE)
@@ -24,5 +24,6 @@ def main():
orchestrator.start_polling(app)
if __name__ == "__main__":
main()
+25
View File
@@ -0,0 +1,25 @@
from argenta.command import Command, PredefinedFlags, Flags, Flag, PossibleValues
from argenta.response import Response
from argenta import Router
work_router: Router = Router(title="Work points:")
flag = Flag('csdv', possible_values=PossibleValues.NEITHER)
@work_router.command(
Command("get",
description="Get Help",
aliases=["help", "Get_help"],
flags=Flags([PredefinedFlags.PORT,
PredefinedFlags.HOST])))
def command_help(response: Response):
print(response.status)
print(response.input_flags.flags)
@work_router.command("run")
def command_start_solving(response: Response):
print(response.status)
print(response.input_flags.flags)
+12 -9
View File
@@ -1,20 +1,15 @@
[project]
name = "argenta"
version = "1.0.0-alpha1"
description = "Python library for creating TUI"
version = "1.1.1"
description = "Python library for building modular CLI applications"
authors = [{ name = "kolo", email = "kolo.is.main@gmail.com" }]
requires-python = ">=3.11, <4.0"
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
dependencies = [
"rich (>=14.0.0,<15.0.0)",
"art (>=6.4,<7.0)",
"pyreadline3 (>=3.5.4,<4.0.0)",
]
[dependency-groups]
dev = [
"pydoc-markdown>=4.8.2,<5",
"pyreadline3>=3.5.4",
]
[tool.ruff]
@@ -31,3 +26,11 @@ exclude = [
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"mypy>=1.14.1",
"pytest>=8.3.2",
"ruff>=0.12.12",
"wemake-python-styleguide>=0.17.0",
]
+10
View File
@@ -0,0 +1,10 @@
__all__ = [
'App',
'Orchestrator',
'Router',
]
from argenta.app import App
from argenta.orchestrator import Orchestrator
from argenta.router import Router
+10 -1
View File
@@ -1,3 +1,12 @@
__all__ = ["App"]
__all__ = [
"App",
"PredefinedMessages",
"DynamicDividingLine",
"StaticDividingLine",
"AutoCompleter"
]
from argenta.app.models import App
from argenta.app.defaults import PredefinedMessages
from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine
from argenta.app.autocompleter.entity import AutoCompleter
+40 -22
View File
@@ -1,36 +1,44 @@
import os
import readline
from typing import Never
class AutoCompleter:
def __init__(self, history_filename: str = False, autocomplete_button: str = 'tab') -> None:
def __init__(
self, history_filename: str | None = None, autocomplete_button: str = "tab"
) -> None:
"""
Public. Configures and implements auto-completion of input command
:param history_filename: the name of the file for saving the history of the autocompleter
:param autocomplete_button: the button for auto-completion
:return: None
"""
self.history_filename = history_filename
self.autocomplete_button = autocomplete_button
self.matches: list[str] = []
self.history_filename: str | None = history_filename
self.autocomplete_button: str = autocomplete_button
def complete(self, text, state) -> str | None:
def _complete(self, text: str, state: int) -> str | None:
"""
Private. Auto-completion function
:param text: part of the command being entered
:param state: the current cursor position is relative to the beginning of the line
:return: the desired candidate as str or None
"""
matches: list[str] = sorted(cmd for cmd in self.get_history_items() if cmd.startswith(text))
matches: list[str] = sorted(
cmd for cmd in _get_history_items() if cmd.startswith(text)
)
if len(matches) > 1:
common_prefix = matches[0]
for match in matches[1:]:
i = 0
while i < len(common_prefix) and i < len(match) and common_prefix[i] == match[i]:
while (
i < len(common_prefix)
and i < len(match)
and common_prefix[i] == match[i]
):
i += 1
common_prefix = common_prefix[:i]
if state == 0:
readline.insert_text(common_prefix[len(text):])
readline.insert_text(common_prefix[len(text) :])
readline.redisplay()
return None
elif len(matches) == 1:
@@ -46,27 +54,37 @@ class AutoCompleter:
"""
if self.history_filename:
if os.path.exists(self.history_filename):
readline.read_history_file(self.history_filename)
readline.read_history_file(self.history_filename)
else:
for line in all_commands:
readline.add_history(line)
readline.add_history(line)
readline.set_completer(self.complete)
readline.set_completer_delims(readline.get_completer_delims().replace(' ', ''))
readline.parse_and_bind(f'{self.autocomplete_button}: complete')
readline.set_completer(self._complete)
readline.set_completer_delims(readline.get_completer_delims().replace(" ", ""))
readline.parse_and_bind(f"{self.autocomplete_button}: complete")
def exit_setup(self) -> None:
def exit_setup(self, all_commands: list[str]) -> None:
"""
Private. Exit setup function
:return: None
"""
if self.history_filename:
readline.write_history_file(self.history_filename)
readline.write_history_file(self.history_filename)
with open(self.history_filename, "r") as history_file:
raw_history = history_file.read()
pretty_history: list[str] = []
for line in set(raw_history.strip().split("\n")):
if line.split()[0] in all_commands:
pretty_history.append(line)
with open(self.history_filename, "w") as history_file:
_ = history_file.write("\n".join(pretty_history))
@staticmethod
def get_history_items() -> list[str] | list:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str]
"""
return [readline.get_history_item(i) for i in range(1, readline.get_current_history_length() + 1)]
def _get_history_items() -> list[str] | list[Never]:
"""
Private. Returns a list of all commands entered by the user
:return: all commands entered by the user as list[str] | list[Never]
"""
return [
readline.get_history_item(i)
for i in range(1, readline.get_current_history_length() + 1)
]
+5 -7
View File
@@ -1,12 +1,10 @@
from dataclasses import dataclass
from enum import StrEnum
@dataclass
class PredefinedMessages:
class PredefinedMessages(StrEnum):
"""
Public. A dataclass with predetermined messages for quick use
"""
USAGE = '[b dim]Usage[/b dim]: [i]<command> <[green]flags[/green]>[/i]'
HELP = '[b dim]Help[/b dim]: [i]<command>[/i] [b red]--help[/b red]'
AUTOCOMPLETE = '[b dim]Autocomplete[/b dim]: [i]<part>[/i] [bold]<tab>'
USAGE = "[b dim]Usage[/b dim]: [i]<command> <[green]flags[/green]>[/i]"
HELP = "[b dim]Help[/b dim]: [i]<command>[/i] [b red]--help[/b red]"
AUTOCOMPLETE = "[b dim]Autocomplete[/b dim]: [i]<part>[/i] [bold]<tab>"
+12 -14
View File
@@ -2,13 +2,13 @@ from abc import ABC
class BaseDividingLine(ABC):
def __init__(self, unit_part: str = '-') -> None:
def __init__(self, unit_part: str = "-") -> None:
"""
Private. The basic dividing line
:param unit_part: the single part of the dividing line
:return: None
"""
self._unit_part = unit_part
self._unit_part: str = unit_part
def get_unit_part(self) -> str:
"""
@@ -16,13 +16,13 @@ class BaseDividingLine(ABC):
:return: unit_part of dividing line as str
"""
if len(self._unit_part) == 0:
return ' '
return " "
else:
return self._unit_part[0]
class StaticDividingLine(BaseDividingLine):
def __init__(self, unit_part: str = '-', length: int = 25) -> None:
def __init__(self, unit_part: str = "-", *, length: int = 25) -> None:
"""
Public. The static dividing line
:param unit_part: the single part of the dividing line
@@ -30,22 +30,22 @@ class StaticDividingLine(BaseDividingLine):
:return: None
"""
super().__init__(unit_part)
self.length = length
self.length: int = length
def get_full_static_line(self, is_override: bool) -> str:
def get_full_static_line(self, *, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param is_override: has the default text layout been redefined
:return: full line of dividing line as str
"""
if is_override:
return f'\n{self.length * self.get_unit_part()}\n'
return f"\n{self.length * self.get_unit_part()}\n"
else:
return f'\n[dim]{self.length * self.get_unit_part()}[/dim]\n'
return f"\n[dim]{self.length * self.get_unit_part()}[/dim]\n"
class DynamicDividingLine(BaseDividingLine):
def __init__(self, unit_part: str = '-') -> None:
def __init__(self, unit_part: str = "-") -> None:
"""
Public. The dynamic dividing line
:param unit_part: the single part of the dividing line
@@ -53,7 +53,7 @@ class DynamicDividingLine(BaseDividingLine):
"""
super().__init__(unit_part)
def get_full_dynamic_line(self, length: int, is_override: bool) -> str:
def get_full_dynamic_line(self, *, length: int, is_override: bool) -> str:
"""
Private. Returns the full line of the dividing line
:param length: the length of the dividing line
@@ -61,8 +61,6 @@ class DynamicDividingLine(BaseDividingLine):
:return: full line of dividing line as str
"""
if is_override:
return f'\n{length * self.get_unit_part()}\n'
return f"\n{length * self.get_unit_part()}\n"
else:
return f'\n[dim]{self.get_unit_part() * length}[/dim]\n'
return f"\n[dim]{self.get_unit_part() * length}[/dim]\n"
-8
View File
@@ -1,8 +0,0 @@
class NoRegisteredHandlersException(Exception):
"""
The router has no registered handlers
"""
def __init__(self, router_name) -> None:
self.router_name = router_name
def __str__(self):
return f"No Registered Handlers Found For '{self.router_name}'"
+258 -180
View File
@@ -1,28 +1,37 @@
from typing import Callable
from rich.console import Console
from rich.markup import escape
from art import text2art
from contextlib import redirect_stdout
import io
import re
from contextlib import redirect_stdout
from typing import Never, TypeAlias
from art import text2art # pyright: ignore[reportMissingTypeStubs, reportUnknownVariableType]
from rich.console import Console
from rich.markup import escape
from argenta.app.autocompleter import AutoCompleter
from argenta.app.dividing_line.models import DynamicDividingLine, StaticDividingLine
from argenta.app.protocols import (
DescriptionMessageGenerator,
EmptyCommandHandler,
NonStandardBehaviorHandler,
Printer,
)
from argenta.app.registered_routers.entity import RegisteredRouters
from argenta.command.exceptions import (
EmptyInputCommandException,
InputCommandException,
RepeatedInputFlagsException,
UnprocessedInputFlagException,
)
from argenta.command.models import Command, InputCommand
from argenta.response import Response
from argenta.router import Router
from argenta.router.defaults import system_router
from argenta.app.autocompleter import AutoCompleter
from argenta.app.dividing_line.models import StaticDividingLine, DynamicDividingLine
from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException,
BaseInputCommandException)
from argenta.app.exceptions import NoRegisteredHandlersException
from argenta.app.registered_routers.entity import RegisteredRouters
Matches: TypeAlias = list[str] | list[Never]
class BaseApp:
def __init__(self,
prompt: str,
def __init__(self, *, prompt: str,
initial_message: str,
farewell_message: str,
exit_command: Command,
@@ -32,89 +41,82 @@ class BaseApp:
repeat_command_groups: bool,
override_system_messages: bool,
autocompleter: AutoCompleter,
print_func: Callable[[str], None]) -> None:
print_func: Printer) -> None:
self._prompt: str = prompt
self._print_func: Printer = print_func
self._exit_command: Command = exit_command
self._system_router_title: str | None = system_router_title
self._dividing_line: StaticDividingLine | DynamicDividingLine = dividing_line
self._ignore_command_register: bool = ignore_command_register
self._repeat_command_groups_description: bool = repeat_command_groups
self._override_system_messages: bool = override_system_messages
self._autocompleter: AutoCompleter = autocompleter
self._prompt = prompt
self._print_func = print_func
self._exit_command = exit_command
self._system_router_title = system_router_title
self._dividing_line = dividing_line
self._ignore_command_register = ignore_command_register
self._repeat_command_groups_description = repeat_command_groups
self._override_system_messages = override_system_messages
self._autocompleter = autocompleter
self._farewell_message: str = farewell_message
self._initial_message: str = initial_message
self._farewell_message = farewell_message
self._initial_message = initial_message
self._description_message_gen: Callable[[str, str], str] = lambda command, description: f'[{command}] *=*=* {description}'
self._description_message_gen: DescriptionMessageGenerator = lambda command, description: f"{command} *=*=* {description}"
self._registered_routers: RegisteredRouters = RegisteredRouters()
self._messages_on_startup: list[str] = []
self._all_registered_triggers_in_lower: list[str] = []
self._all_registered_triggers_in_default_case: list[str] = []
self._matching_lower_triggers_with_routers: dict[str, Router] = {}
self._matching_default_triggers_with_routers: dict[str, Router] = {}
self._invalid_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Incorrect flag syntax: {raw_command}')
self._repeated_input_flags_handler: Callable[[str], None] = lambda raw_command: print_func(f'Repeated input flags: {raw_command}')
self._empty_input_command_handler: Callable[[], None] = lambda: print_func('Empty input command')
self._unknown_command_handler: Callable[[InputCommand], None] = lambda command: print_func(f"Unknown command: {command.get_trigger()}")
self._exit_command_handler: Callable[[], None] = lambda: print_func(self._farewell_message)
self._current_matching_triggers_with_routers: dict[str, Router] = self._matching_lower_triggers_with_routers if self._ignore_command_register else self._matching_default_triggers_with_routers
self._incorrect_input_syntax_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Incorrect flag syntax: {_}")
self._repeated_input_flags_handler: NonStandardBehaviorHandler[str] = lambda _: print_func(f"Repeated input flags: {_}")
self._empty_input_command_handler: EmptyCommandHandler = lambda: print_func("Empty input command")
self._unknown_command_handler: NonStandardBehaviorHandler[InputCommand] = lambda _: print_func(f"Unknown command: {_.trigger}")
self._exit_command_handler: NonStandardBehaviorHandler[Response] = lambda _: print_func(self._farewell_message)
def set_description_message_pattern(self, pattern: Callable[[str, str], str]) -> None:
def set_description_message_pattern(self, _: DescriptionMessageGenerator, /) -> None:
"""
Public. Sets the output pattern of the available commands
:param pattern: output pattern of the available commands
:param _: output pattern of the available commands
:return: None
"""
self._description_message_gen: Callable[[str, str], str] = pattern
self._description_message_gen = _
def set_invalid_input_flags_handler(self, handler: Callable[[str], None]) -> None:
def set_incorrect_input_syntax_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for incorrect flags when entering a command
:param handler: handler for incorrect flags when entering a command
:param _: handler for incorrect flags when entering a command
:return: None
"""
self._invalid_input_flags_handler = handler
self._incorrect_input_syntax_handler = _
def set_repeated_input_flags_handler(self, handler: Callable[[str], None]) -> None:
def set_repeated_input_flags_handler(self, _: NonStandardBehaviorHandler[str], /) -> None:
"""
Public. Sets the handler for repeated flags when entering a command
:param handler: handler for repeated flags when entering a command
:param _: handler for repeated flags when entering a command
:return: None
"""
self._repeated_input_flags_handler = handler
self._repeated_input_flags_handler = _
def set_unknown_command_handler(self, handler: Callable[[str], None]) -> None:
def set_unknown_command_handler(self, _: NonStandardBehaviorHandler[InputCommand], /) -> None:
"""
Public. Sets the handler for unknown commands when entering a command
:param handler: handler for unknown commands when entering a command
:param _: handler for unknown commands when entering a command
:return: None
"""
self._unknown_command_handler = handler
self._unknown_command_handler = _
def set_empty_command_handler(self, handler: Callable[[], None]) -> None:
def set_empty_command_handler(self, _: EmptyCommandHandler, /) -> None:
"""
Public. Sets the handler for empty commands when entering a command
:param handler: handler for empty commands when entering a command
:param _: handler for empty commands when entering a command
:return: None
"""
self._empty_input_command_handler = handler
self._empty_input_command_handler = _
def set_exit_command_handler(self, handler: Callable[[], None]) -> None:
def set_exit_command_handler(self, _: NonStandardBehaviorHandler[Response], /) -> None:
"""
Public. Sets the handler for exit command when entering a command
:param handler: handler for exit command when entering a command
:param _: handler for exit command when entering a command
:return: None
"""
self._exit_command_handler = handler
self._exit_command_handler = _
def _print_command_group_description(self) -> None:
"""
@@ -122,14 +124,17 @@ class BaseApp:
:return: None
"""
for registered_router in self._registered_routers:
if registered_router.get_title():
self._print_func(registered_router.get_title())
for command_handler in registered_router.get_command_handlers():
self._print_func(self._description_message_gen(
command_handler.get_handled_command().get_trigger(),
command_handler.get_handled_command().get_description()))
self._print_func('')
if registered_router.title:
self._print_func(registered_router.title)
for command_handler in registered_router.command_handlers:
handled_command = command_handler.handled_command
self._print_func(
self._description_message_gen(
handled_command.trigger,
handled_command.description,
)
)
self._print_func("")
def _print_framed_text(self, text: str) -> None:
"""
@@ -137,20 +142,40 @@ class BaseApp:
:param text: framed text
:return: None
"""
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
self._print_func(text)
self._print_func(self._dividing_line.get_full_static_line(self._override_system_messages))
if isinstance(self._dividing_line, DynamicDividingLine):
clear_text = re.sub(r"\u001b\[[0-9;]*m", "", text)
max_length_line = max([len(line) for line in clear_text.split("\n")])
max_length_line = (
max_length_line
if 10 <= max_length_line <= 80
else 80
if max_length_line > 80
else 10
)
elif isinstance(self._dividing_line, DynamicDividingLine):
clear_text = re.sub(r'\u001b\[[0-9;]*m', '', text)
max_length_line = max([len(line) for line in clear_text.split('\n')])
max_length_line = max_length_line if 10 <= max_length_line <= 80 else 80 if max_length_line > 80 else 10
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
print(text.strip('\n'))
self._print_func(self._dividing_line.get_full_dynamic_line(max_length_line, self._override_system_messages))
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
print(text.strip("\n"))
self._print_func(
self._dividing_line.get_full_dynamic_line(
length=max_length_line, is_override=self._override_system_messages
)
)
elif isinstance(self._dividing_line, StaticDividingLine): # pyright: ignore[reportUnnecessaryIsInstance]
self._print_func(
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
)
print(text.strip("\n"))
self._print_func(
self._dividing_line.get_full_static_line(is_override=self._override_system_messages)
)
else:
raise NotImplementedError
def _is_exit_command(self, command: InputCommand) -> bool:
"""
@@ -158,143 +183,185 @@ class BaseApp:
:param command: command to check
:return: is it an exit command or not as bool
"""
trigger = command.trigger
exit_trigger = self._exit_command.trigger
if self._ignore_command_register:
if command.get_trigger().lower() == self._exit_command.get_trigger().lower():
if (
trigger.lower() == exit_trigger.lower()
):
return True
elif command.get_trigger().lower() in [x.lower() for x in self._exit_command.get_aliases()]:
elif trigger.lower() in [
x.lower() for x in self._exit_command.aliases
]:
return True
else:
if command.get_trigger() == self._exit_command.get_trigger():
if trigger == exit_trigger:
return True
elif command.get_trigger() in self._exit_command.get_aliases():
elif trigger in self._exit_command.aliases:
return True
return False
def _is_unknown_command(self, command: InputCommand) -> bool:
"""
Private. Checks if the given command is an unknown command
:param command: command to check
:return: is it an unknown command or not as bool
"""
input_command_trigger = command.get_trigger()
input_command_trigger = command.trigger
if self._ignore_command_register:
if input_command_trigger.lower() in self._all_registered_triggers_in_lower:
if input_command_trigger.lower() in list(self._current_matching_triggers_with_routers.keys()):
return False
else:
if input_command_trigger in self._all_registered_triggers_in_default_case:
if input_command_trigger in list(self._current_matching_triggers_with_routers.keys()):
return False
with redirect_stdout(io.StringIO()) as f:
self._unknown_command_handler(command)
res: str = f.getvalue()
self._print_framed_text(res)
return True
def _error_handler(self, error: BaseInputCommandException, raw_command: str) -> None:
def _error_handler(
self, error: InputCommandException, raw_command: str
) -> None:
"""
Private. Handles parsing errors of the entered command
:param error: error being handled
:param raw_command: the raw input command
:return: None
"""
match error:
case UnprocessedInputFlagException():
self._invalid_input_flags_handler(raw_command)
case RepeatedInputFlagsException():
self._repeated_input_flags_handler(raw_command)
case EmptyInputCommandException():
self._empty_input_command_handler()
def _validate_included_routers(self) -> None:
"""
Private. Validates included routers
:return: None
"""
for router in self._registered_routers:
if not router.get_command_handlers():
raise NoRegisteredHandlersException(router.get_name())
if isinstance(error, UnprocessedInputFlagException):
self._incorrect_input_syntax_handler(raw_command)
elif isinstance(error, RepeatedInputFlagsException):
self._repeated_input_flags_handler(raw_command)
elif isinstance(error, EmptyInputCommandException):
self._empty_input_command_handler()
def _setup_system_router(self) -> None:
"""
Private. Sets up system router
:return: None
"""
system_router.set_title(self._system_router_title)
system_router.title = self._system_router_title
@system_router.command(self._exit_command)
def exit_command():
self._exit_command_handler()
def _(response: Response) -> None:
self._exit_command_handler(response)
if system_router not in self._registered_routers.get_registered_routers():
system_router.set_command_register_ignore(self._ignore_command_register)
if system_router not in self._registered_routers.registered_routers:
system_router.command_register_ignore = self._ignore_command_register
self._registered_routers.add_registered_router(system_router)
def _most_similar_command(self, unknown_command: str) -> str | None:
all_commands = list(self._current_matching_triggers_with_routers.keys())
matches_startswith_unknown_command: Matches = sorted(
cmd for cmd in all_commands if cmd.startswith(unknown_command)
)
matches_startswith_cmd: Matches = sorted(
cmd for cmd in all_commands if unknown_command.startswith(cmd)
)
matches: Matches = matches_startswith_unknown_command or matches_startswith_cmd
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
return sorted(matches, key=lambda cmd: len(cmd))[0]
else:
return None
def _setup_default_view(self) -> None:
"""
Private. Sets up default app view
:return: None
"""
if not self._override_system_messages:
self._initial_message = f'\n[bold red]{text2art(self._initial_message, font="tarty1")}\n\n'
self._farewell_message = (f'[bold red]\n{text2art(f"\n{self._farewell_message}\n", font="chanky")}[/bold red]\n'
f'[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n')
self._description_message_gen = lambda command, description: (f'[bold red]{escape("[" + command + "]")}[/bold red] '
f'[blue dim]*=*=*[/blue dim] '
f'[bold yellow italic]{escape(description)}')
self._invalid_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Incorrect flag syntax: {escape(raw_command)}')
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f'[red bold]Repeated input flags: {escape(raw_command)}')
self._empty_input_command_handler = lambda: self._print_func('[red bold]Empty input command')
self._unknown_command_handler = lambda command: self._print_func(f"[red bold]Unknown command: {escape(command.get_trigger())}")
self._prompt = f"[italic dim bold]{self._prompt}"
self._initial_message = ("\n" + f"[bold red]{text2art(self._initial_message, font='tarty1')}" + "\n")
self._farewell_message = (
"[bold red]\n\n" +
str(text2art(self._farewell_message, font="chanky")) + # pyright: ignore[reportUnknownArgumentType]
"\n[/bold red]\n" +
"[red i]github.com/koloideal/Argenta[/red i] | [red bold i]made by kolo[/red bold i]\n"
)
self._description_message_gen = lambda command, description: (
f"[bold red]{escape('[' + command + ']')}[/bold red] "
f"[blue dim]*=*=*[/blue dim] "
f"[bold yellow italic]{escape(description)}"
)
self._incorrect_input_syntax_handler = lambda raw_command: self._print_func(f"[red bold]Incorrect flag syntax: {escape(raw_command)}")
self._repeated_input_flags_handler = lambda raw_command: self._print_func(f"[red bold]Repeated input flags: {escape(raw_command)}")
self._empty_input_command_handler = lambda: self._print_func("[red bold]Empty input command")
def unknown_command_handler(command: InputCommand) -> None:
cmd_trg: str = command.trigger
mst_sim_cmd: str | None = self._most_similar_command(cmd_trg)
first_part_of_text = f"[red]Unknown command:[/red] [blue]{escape(cmd_trg)}[/blue]"
second_part_of_text = (
("[red], most similar:[/red] " + ("[blue]" + mst_sim_cmd + "[/blue]"))
if mst_sim_cmd
else ""
)
self._print_func(first_part_of_text + second_part_of_text)
self._unknown_command_handler = unknown_command_handler
def _pre_cycle_setup(self) -> None:
"""
Private. Configures various aspects of the application before the start of the cycle
:return: None
"""
self._setup_default_view()
self._setup_system_router()
self._validate_included_routers()
for router_entity in self._registered_routers:
self._all_registered_triggers_in_default_case.extend(router_entity.get_triggers())
self._all_registered_triggers_in_default_case.extend(router_entity.get_aliases())
router_triggers = router_entity.triggers
router_aliases = router_entity.aliases
combined = router_triggers + router_aliases
self._all_registered_triggers_in_lower.extend([x.lower() for x in router_entity.get_triggers()])
self._all_registered_triggers_in_lower.extend([x.lower() for x in router_entity.get_aliases()])
for trigger in combined:
self._matching_default_triggers_with_routers[trigger] = router_entity
self._matching_lower_triggers_with_routers[trigger.lower()] = router_entity
self._autocompleter.initial_setup(self._all_registered_triggers_in_lower)
self._autocompleter.initial_setup(list(self._current_matching_triggers_with_routers.keys()))
seen = {}
for item in list(self._current_matching_triggers_with_routers.keys()):
if item in seen:
Console().print(f"\n[b red]WARNING:[/b red] Overlapping trigger or alias: [b blue]{item}[/b blue]")
else:
seen[item] = True
if not self._override_system_messages:
self._setup_default_view()
self._print_func(self._initial_message)
for message in self._messages_on_startup:
self._print_func(message)
if self._messages_on_startup:
print('\n\n')
print("\n")
if not self._repeat_command_groups_description:
self._print_command_group_description()
AVAILABLE_DIVIDING_LINES: TypeAlias = StaticDividingLine | DynamicDividingLine
DEFAULT_DIVIDING_LINE: StaticDividingLine = StaticDividingLine()
DEFAULT_PRINT_FUNC: Printer = Console().print
DEFAULT_AUTOCOMPLETER: AutoCompleter = AutoCompleter()
DEFAULT_EXIT_COMMAND: Command = Command("Q", description="Exit command")
class App(BaseApp):
def __init__(self,
prompt: str = '[italic dim bold]What do you want to do?\n',
initial_message: str = '\nArgenta\n',
farewell_message: str = '\nSee you\n',
exit_command: Command = Command('Q', 'Exit command'),
system_router_title: str | None = 'System points:',
ignore_command_register: bool = True,
dividing_line: StaticDividingLine | DynamicDividingLine = StaticDividingLine(),
repeat_command_groups: bool = True,
override_system_messages: bool = False,
autocompleter: AutoCompleter = AutoCompleter(),
print_func: Callable[[str], None] = Console().print) -> None:
def __init__(
self, *,
prompt: str = "What do you want to do?\n\n",
initial_message: str = "Argenta\n",
farewell_message: str = "\nSee you\n",
exit_command: Command = DEFAULT_EXIT_COMMAND,
system_router_title: str | None = "System points:",
ignore_command_register: bool = True,
dividing_line: AVAILABLE_DIVIDING_LINES = DEFAULT_DIVIDING_LINE,
repeat_command_groups: bool = True,
override_system_messages: bool = False,
autocompleter: AutoCompleter = DEFAULT_AUTOCOMPLETER,
print_func: Printer = DEFAULT_PRINT_FUNC,
) -> None:
"""
Public. The essence of the application itself.
Configures and manages all aspects of the behavior and presentation of the user interacting with the user
@@ -311,18 +378,19 @@ class App(BaseApp):
:param print_func: system messages text output function
:return: None
"""
super().__init__(prompt=prompt,
initial_message=initial_message,
farewell_message=farewell_message,
exit_command=exit_command,
system_router_title=system_router_title,
ignore_command_register=ignore_command_register,
dividing_line=dividing_line,
repeat_command_groups=repeat_command_groups,
override_system_messages=override_system_messages,
autocompleter=autocompleter,
print_func=print_func)
super().__init__(
prompt=prompt,
initial_message=initial_message,
farewell_message=farewell_message,
exit_command=exit_command,
system_router_title=system_router_title,
ignore_command_register=ignore_command_register,
dividing_line=dividing_line,
repeat_command_groups=repeat_command_groups,
override_system_messages=override_system_messages,
autocompleter=autocompleter,
print_func=print_func,
)
def run_polling(self) -> None:
"""
@@ -338,30 +406,43 @@ class App(BaseApp):
try:
input_command: InputCommand = InputCommand.parse(raw_command=raw_command)
except BaseInputCommandException as error:
with redirect_stdout(io.StringIO()) as f:
except InputCommandException as error:
with redirect_stdout(io.StringIO()) as stderr:
self._error_handler(error, raw_command)
res: str = f.getvalue()
self._print_framed_text(res)
stderr_result: str = stderr.getvalue()
self._print_framed_text(stderr_result)
continue
if self._is_exit_command(input_command):
system_router.finds_appropriate_handler(input_command)
self._autocompleter.exit_setup()
self._autocompleter.exit_setup(list(self._current_matching_triggers_with_routers.keys()))
return
if self._is_unknown_command(input_command):
with redirect_stdout(io.StringIO()) as stdout:
self._unknown_command_handler(input_command)
stdout_res: str = stdout.getvalue()
self._print_framed_text(stdout_res)
continue
with redirect_stdout(io.StringIO()) as f:
for registered_router in self._registered_routers:
registered_router.finds_appropriate_handler(input_command)
res: str = f.getvalue()
self._print_framed_text(res)
if not self._repeat_command_groups_description:
self._print_func(self._prompt)
processing_router = self._current_matching_triggers_with_routers[input_command.trigger.lower()]
if processing_router.disable_redirect_stdout:
if isinstance(self._dividing_line, StaticDividingLine):
self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages))
processing_router.finds_appropriate_handler(input_command)
self._print_func(self._dividing_line.get_full_static_line(is_override=self._override_system_messages))
else:
dividing_line_unit_part: str = self._dividing_line.get_unit_part()
self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages))
processing_router.finds_appropriate_handler(input_command)
self._print_func(StaticDividingLine(dividing_line_unit_part).get_full_static_line(is_override=self._override_system_messages))
else:
with redirect_stdout(io.StringIO()) as stdout:
processing_router.finds_appropriate_handler(input_command)
stdout_result: str = stdout.getvalue()
if stdout_result:
self._print_framed_text(stdout_result)
def include_router(self, router: Router) -> None:
"""
@@ -369,10 +450,9 @@ class App(BaseApp):
:param router: registered router
:return: None
"""
router.set_command_register_ignore(self._ignore_command_register)
router.command_register_ignore = self._ignore_command_register
self._registered_routers.add_registered_router(router)
def include_routers(self, *routers: Router) -> None:
"""
Public. Registers the routers in the application
@@ -382,7 +462,6 @@ class App(BaseApp):
for router in routers:
self.include_router(router)
def add_message_on_startup(self, message: str) -> None:
"""
Public. Adds a message that will be displayed when the application is launched
@@ -390,4 +469,3 @@ class App(BaseApp):
:return: None
"""
self._messages_on_startup.append(message)
+22
View File
@@ -0,0 +1,22 @@
from typing import Protocol, TypeVar
T = TypeVar('T', contravariant=True) # noqa: WPS111
class NonStandardBehaviorHandler(Protocol[T]):
def __call__(self, __param: T) -> None:
raise NotImplementedError
class EmptyCommandHandler(Protocol):
def __call__(self) -> None:
raise NotImplementedError
class Printer(Protocol):
def __call__(self, __text: str) -> None:
raise NotImplementedError
class DescriptionMessageGenerator(Protocol):
def __call__(self, __first_param: str, __second_param: str) -> str:
raise NotImplementedError
+10 -23
View File
@@ -1,40 +1,27 @@
from typing import Iterator, Optional
from argenta.router import Router
class RegisteredRouters:
def __init__(self, registered_routers: list[Router] = None) -> None:
def __init__(self, registered_routers: Optional[list[Router]] = None) -> None:
"""
Private. Combines registered routers
:param registered_routers: list of the registered routers
:return: None
"""
self._registered_routers = registered_routers if registered_routers else []
self.registered_routers: list[Router] = registered_routers if registered_routers else []
def get_registered_routers(self) -> list[Router]:
"""
Private. Returns the registered routers
:return: registered routers as list[Router]
"""
return self._registered_routers
def add_registered_router(self, router: Router) -> None:
def add_registered_router(self, router: Router, /) -> None:
"""
Private. Adds a new registered router
:param router: registered router
:return: None
"""
self._registered_routers.append(router)
self.registered_routers.append(router)
def add_registered_routers(self, *routers: Router) -> None:
"""
Private. Adds new registered routers
:param routers: registered routers
:return: None
"""
self._registered_routers.extend(routers)
def __iter__(self) -> Iterator[Router]:
return iter(self.registered_routers)
def __iter__(self):
return iter(self._registered_routers)
def __next__(self):
return next(iter(self._registered_routers))
def __next__(self) -> Router:
return next(iter(self.registered_routers))
+11 -2
View File
@@ -1,3 +1,12 @@
__all__ = ["Command"]
__all__ = [
"Command",
"PossibleValues",
"PredefinedFlags",
"InputCommand",
"Flags",
"Flag"
]
from argenta.command.models import Command
from argenta.command.models import Command, InputCommand
from argenta.command.flag import defaults as PredefinedFlags
from argenta.command.flag import (Flag, Flags, PossibleValues)
+26 -12
View File
@@ -1,35 +1,49 @@
from argenta.command.flag.models import InputFlag, Flag
from argenta.command.flag.models import Flag, InputFlag
from abc import ABC, abstractmethod
from typing import override
class BaseInputCommandException(Exception):
class InputCommandException(ABC, Exception):
"""
Private. Base exception class for all exceptions raised when parse input command
"""
pass
@override
@abstractmethod
def __str__(self) -> str:
raise NotImplementedError
class UnprocessedInputFlagException(BaseInputCommandException):
class UnprocessedInputFlagException(InputCommandException):
"""
Private. Raised when an unprocessed input flag is detected
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Unprocessed Input Flags"
class RepeatedInputFlagsException(BaseInputCommandException):
class RepeatedInputFlagsException(InputCommandException):
"""
Private. Raised when repeated input flags are detected
"""
def __init__(self, flag: Flag | InputFlag):
self.flag = flag
def __str__(self):
return ("Repeated Input Flags\n"
f"Duplicate flag was detected in the input: '{self.flag.get_string_entity()}'")
self.flag: Flag | InputFlag = flag
super().__init__()
@override
def __str__(self) -> str:
string_entity: str = self.flag.string_entity
return (
"Repeated Input Flags\n"
f"Duplicate flag was detected in the input: '{string_entity}'"
)
class EmptyInputCommandException(BaseInputCommandException):
class EmptyInputCommandException(InputCommandException):
"""
Private. Raised when an empty input command is detected
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Input Command is empty"
+9 -2
View File
@@ -1,4 +1,11 @@
__all__ = ('InputFlags', 'InputFlag', 'Flag', 'Flags')
__all__ = [
"Flag",
"InputFlag",
"Flags",
"PossibleValues",
"ValidationStatus"
]
from argenta.command.flag.models import InputFlags, InputFlag, Flags, Flag
from argenta.command.flag.models import Flag, InputFlag, PossibleValues, ValidationStatus
from argenta.command.flag.flags.models import Flags
+21 -18
View File
@@ -1,24 +1,27 @@
from dataclasses import dataclass
from argenta.command.flag.models import Flag
import re
from typing import Literal
from argenta.command.flag.models import Flag, PossibleValues
import re
@dataclass
class PredefinedFlags:
"""
Public. A dataclass with predefined flags and most frequently used flags for quick use
"""
HELP = Flag(name='help', possible_values=False)
SHORT_HELP = Flag(name='H', prefix='-', possible_values=False)
DEFAULT_PREFIX: Literal["-", "--", "---"] = "-"
INFO = Flag(name='info', possible_values=False)
SHORT_INFO = Flag(name='I', prefix='-', possible_values=False)
HELP = Flag(name="help", possible_values=PossibleValues.NEITHER)
SHORT_HELP = Flag(name="H", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
ALL = Flag(name='all', possible_values=False)
SHORT_ALL = Flag(name='A', prefix='-', possible_values=False)
INFO = Flag(name="info", possible_values=PossibleValues.NEITHER) # noqa: WPS110
SHORT_INFO = Flag(name="I", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
HOST = Flag(name='host', possible_values=re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'))
SHORT_HOST = Flag(name='H', prefix='-', possible_values=re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'))
ALL = Flag(name="all", possible_values=PossibleValues.NEITHER)
SHORT_ALL = Flag(name="A", prefix=DEFAULT_PREFIX, possible_values=PossibleValues.NEITHER)
PORT = Flag(name='port', possible_values=re.compile(r'^\d{1,5}$'))
SHORT_PORT = Flag(name='P', prefix='-', possible_values=re.compile(r'^\d{1,5}$'))
HOST = Flag(
name="host", possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
)
SHORT_HOST = Flag(
name="H",
prefix=DEFAULT_PREFIX,
possible_values=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"),
)
PORT = Flag(name="port", possible_values=re.compile(r"^\d{1,5}$"))
SHORT_PORT = Flag(name="P", prefix=DEFAULT_PREFIX, possible_values=re.compile(r"^\d{1,5}$"))
@@ -0,0 +1,10 @@
__all__ = [
"Flags",
"InputFlags"
]
from argenta.command.flag.flags.models import (
Flags,
InputFlags
)
+106
View File
@@ -0,0 +1,106 @@
from argenta.command.flag.models import InputFlag, Flag
from typing import Generic, TypeVar, override
from collections.abc import Iterator
FlagType = TypeVar("FlagType")
class BaseFlags(Generic[FlagType]):
def __init__(self, flags: list[FlagType] | None = None) -> None:
"""
Public. A model that combines the registered flags
:param flags: the flags that will be registered
:return: None
"""
self.flags: list[FlagType] = flags if flags else []
def add_flag(self, flag: FlagType) -> None:
"""
Public. Adds a flag to the list of flags
:param flag: flag to add
:return: None
"""
self.flags.append(flag)
def add_flags(self, flags: list[FlagType]) -> None:
"""
Public. Adds a list of flags to the list of flags
:param flags: list of flags to add
:return: None
"""
self.flags.extend(flags)
def __iter__(self) -> Iterator[FlagType]:
return iter(self.flags)
def __next__(self) -> FlagType:
return next(iter(self))
def __getitem__(self, flag_index: int) -> FlagType:
return self.flags[flag_index]
def __bool__(self) -> bool:
return bool(self.flags)
class Flags(BaseFlags[Flag]):
def get_flag_by_name(self, name: str) -> Flag | None:
"""
Public. Returns the flag entity by its name or None if not found
:param name: the name of the flag to get
:return: entity of the flag or None
"""
return next((flag for flag in self.flags if flag.name == name), None)
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, Flags):
return NotImplemented
if len(self.flags) != len(other.flags):
return False
flag_pairs: zip[tuple[Flag, Flag]] = zip(self.flags, other.flags)
return all(s_flag == o_flag for s_flag, o_flag in flag_pairs)
def __contains__(self, flag_to_check: object) -> bool:
if isinstance(flag_to_check, Flag):
for flag in self.flags:
if flag == flag_to_check:
return True
return False
else:
raise TypeError
class InputFlags(BaseFlags[InputFlag]):
def get_flag_by_name(self, name: str) -> InputFlag | None:
"""
Public. Returns the flag entity by its name or None if not found
:param name: the name of the flag to get
:return: entity of the flag or None
"""
return next((flag for flag in self.flags if flag.name == name), None)
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, InputFlags):
raise NotImplementedError
if len(self.flags) != len(other.flags):
return False
paired_flags: zip[tuple[InputFlag, InputFlag]] = zip(self.flags, other.flags)
return all(my_flag == other_flag for my_flag, other_flag in paired_flags)
def __contains__(self, ingressable_item: object) -> bool:
if isinstance(ingressable_item, InputFlag):
for flag in self.flags:
if flag == ingressable_item:
return True
return False
else:
raise TypeError
+88 -187
View File
@@ -1,78 +1,25 @@
from typing import Literal, Pattern
from abc import ABC, abstractmethod
from enum import Enum
from re import Pattern
from typing import Literal, override
class BaseFlag(ABC):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--') -> None:
"""
Private. Base class for flags
:param name: the name of the flag
:param prefix: the prefix of the flag
:return: None
"""
self._name = name
self._prefix = prefix
def get_string_entity(self) -> str:
"""
Public. Returns a string representation of the flag
:return: string representation of the flag as str
"""
string_entity: str = self._prefix + self._name
return string_entity
def get_name(self) -> str:
"""
Public. Returns the name of the flag
:return: the name of the flag as str
"""
return self._name
def get_prefix(self) -> str:
"""
Public. Returns the prefix of the flag
:return: the prefix of the flag as str
"""
return self._prefix
class PossibleValues(Enum):
NEITHER = 'NEITHER'
ALL = 'ALL'
class InputFlag(BaseFlag):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--',
value: str = None):
"""
Public. The entity of the flag of the entered command
:param name: the name of the input flag
:param prefix: the prefix of the input flag
:param value: the value of the input flag
:return: None
"""
super().__init__(name, prefix)
self._flag_value = value
def get_value(self) -> str | None:
"""
Public. Returns the value of the flag
:return: the value of the flag as str
"""
return self._flag_value
def set_value(self, value):
"""
Private. Sets the value of the flag
:param value: the fag value to set
:return: None
"""
self._flag_value = value
class ValidationStatus(Enum):
VALID = 'VALID'
INVALID = 'INVALID'
UNDEFINED = 'UNDEFINED'
class Flag(BaseFlag):
def __init__(self, name: str,
prefix: Literal['-', '--', '---'] = '--',
possible_values: list[str] | Pattern[str] | False = True) -> None:
class Flag:
def __init__(
self, name: str, *,
prefix: Literal["-", "--", "---"] = "--",
possible_values: list[str] | Pattern[str] | PossibleValues = PossibleValues.ALL,
) -> None:
"""
Public. The entity of the flag being registered for subsequent processing
:param name: The name of the flag
@@ -80,139 +27,93 @@ class Flag(BaseFlag):
:param possible_values: The possible values of the flag, if False then the flag cannot have a value
:return: None
"""
super().__init__(name, prefix)
self.possible_values = possible_values
self.name: str = name
self.prefix: Literal["-", "--", "---"] = prefix
self.possible_values: list[str] | Pattern[str] | PossibleValues = possible_values
def validate_input_flag_value(self, input_flag_value: str | None):
def validate_input_flag_value(self, input_flag_value: str | None) -> bool:
"""
Private. Validates the input flag value
:param input_flag_value: The input flag value to validate
:return: whether the entered flag is valid as bool
"""
if self.possible_values is False:
if input_flag_value is None:
return True
else:
return False
elif isinstance(self.possible_values, Pattern):
if isinstance(input_flag_value, str):
is_valid = bool(self.possible_values.match(input_flag_value))
if bool(is_valid):
return True
else:
return False
else:
return False
if self.possible_values == PossibleValues.NEITHER:
return input_flag_value is None
elif isinstance(self.possible_values, list):
if input_flag_value in self.possible_values:
return True
else:
return False
if isinstance(self.possible_values, Pattern):
return isinstance(input_flag_value, str) and bool(self.possible_values.match(input_flag_value))
if isinstance(self.possible_values, list):
return input_flag_value in self.possible_values
return True
@property
def string_entity(self) -> str:
"""
Public. Returns a string representation of the flag
:return: string representation of the flag as str
"""
string_entity: str = self.prefix + self.name
return string_entity
@override
def __str__(self) -> str:
return self.string_entity
@override
def __repr__(self) -> str:
return f'Flag<name={self.name}, prefix={self.prefix}>'
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, Flag):
return self.string_entity == other.string_entity
else:
return True
raise NotImplementedError
class BaseFlags(ABC):
"""
Private. Base class for groups of flags
"""
__slots__ = ('_flags',)
@abstractmethod
def get_flags(self):
class InputFlag:
def __init__(
self, name: str, *,
prefix: Literal['-', '--', '---'] = '--',
input_value: str | None,
status: ValidationStatus | None
):
"""
Public. Returns a list of flags
:return: list of flags
"""
pass
@abstractmethod
def add_flag(self, flag: Flag | InputFlag):
"""
Public. Adds a flag to the list of flags
:param flag: flag to add
Public. The entity of the flag of the entered command
:param name: the name of the input flag
:param prefix: the prefix of the input flag
:param value: the value of the input flag
:return: None
"""
pass
@abstractmethod
def add_flags(self, flags: list[Flag] | list[InputFlag]):
self.name: str = name
self.prefix: Literal['-', '--', '---'] = prefix
self.input_value: str | None = input_value
self.status: ValidationStatus | None = status
@property
def string_entity(self) -> str:
"""
Public. Adds a list of flags to the list of flags
:param flags: list of flags to add
:return: None
Public. Returns a string representation of the flag
:return: string representation of the flag as str
"""
pass
string_entity: str = self.prefix + self.name
return string_entity
@abstractmethod
def get_flag(self, name: str):
"""
Public. Returns the flag entity by its name or None if not found
:param name: the name of the flag to get
:return: entity of the flag or None
"""
pass
@override
def __str__(self) -> str:
return f'{self.string_entity} {self.input_value}'
@override
def __repr__(self) -> str:
return f'InputFlag<name={self.name}, prefix={self.prefix}, value={self.input_value}, status={self.status}>'
def __iter__(self):
return iter(self._flags)
def __next__(self):
return next(iter(self))
def __getitem__(self, item):
return self._flags[item]
class Flags(BaseFlags, ABC):
def __init__(self, *flags: Flag):
"""
Public. A model that combines the registered flags
:param flags: the flags that will be registered
:return: None
"""
self._flags = flags if flags else []
def get_flags(self) -> list[Flag]:
return self._flags
def add_flag(self, flag: Flag):
self._flags.append(flag)
def add_flags(self, flags: list[Flag]):
self._flags.extend(flags)
def get_flag(self, name: str) -> Flag | None:
if name in [flag.get_name() for flag in self._flags]:
return list(filter(lambda flag: flag.get_name() == name, self._flags))[0]
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, InputFlag):
return (
self.name == other.name
)
else:
return None
class InputFlags(BaseFlags, ABC):
def __init__(self, *flags: InputFlag):
"""
Public. A model that combines the input flags of the input command
:param flags: all input flags
:return: None
"""
self._flags = flags if flags else []
def get_flags(self) -> list[InputFlag]:
return self._flags
def add_flag(self, flag: InputFlag):
self._flags.append(flag)
def add_flags(self, flags: list[InputFlag]):
self._flags.extend(flags)
def get_flag(self, name: str) -> InputFlag | None:
if name in [flag.get_name() for flag in self._flags]:
return list(filter(lambda flag: flag.get_name() == name, self._flags))[0]
else:
return None
raise NotImplementedError
+114 -127
View File
@@ -1,34 +1,30 @@
from argenta.command.flag.models import Flag, InputFlag, Flags, InputFlags
from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException)
from typing import Generic, TypeVar, cast, Literal
from argenta.command.flag.models import Flag, InputFlag, ValidationStatus
from argenta.command.flag.flags.models import InputFlags, Flags
from argenta.command.exceptions import (
UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException,
)
from typing import Never, Self, cast, Literal
InputCommandType = TypeVar('InputCommandType')
ParseFlagsResult = tuple[InputFlags, str | None, str | None]
ParseResult = tuple[str, InputFlags]
MIN_FLAG_PREFIX: str = "-"
DEFAULT_WITHOUT_FLAGS: Flags = Flags()
DEFAULT_WITHOUT_INPUT_FLAGS: InputFlags = InputFlags()
class BaseCommand:
def __init__(self, trigger: str) -> None:
"""
Private. Base class for all commands
:param trigger: A string trigger, which, when entered by the user, indicates that the input corresponds to the command
"""
self._trigger = trigger
def get_trigger(self) -> str:
"""
Public. Returns the trigger of the command
:return: the trigger of the command as str
"""
return self._trigger
class Command(BaseCommand):
def __init__(self, trigger: str,
description: str = None,
flags: Flag | Flags = None,
aliases: list[str] = None):
class Command:
def __init__(
self,
trigger: str, *,
description: str | None = None,
flags: Flag | Flags = DEFAULT_WITHOUT_FLAGS,
aliases: list[str] | None = None,
):
"""
Public. The command that can and should be registered in the Router
:param trigger: A string trigger, which, when entered by the user, indicates that the input corresponds to the command
@@ -36,129 +32,120 @@ class Command(BaseCommand):
:param flags: processed commands
:param aliases: string synonyms for the main trigger
"""
super().__init__(trigger)
self._registered_flags: Flags = flags if isinstance(flags, Flags) else Flags(flags) if isinstance(flags, Flag) else Flags()
self._description = f'Description for "{self._trigger}" command' if not description else description
self._aliases = aliases if isinstance(aliases, list) else []
self.registered_flags: Flags = flags if isinstance(flags, Flags) else Flags([flags])
self.trigger: str = trigger
self.description: str = description if description else "Command without description"
self.aliases: list[str] = aliases if aliases else []
def get_registered_flags(self) -> Flags:
"""
Private. Returns the registered flags
:return: the registered flags as Flags
"""
return self._registered_flags
def get_aliases(self) -> list[str] | list:
"""
Public. Returns the aliases of the command
:return: the aliases of the command as list[str] | list
"""
return self._aliases
def validate_input_flag(self, flag: InputFlag) -> bool:
def validate_input_flag(
self, flag: InputFlag
) -> ValidationStatus:
"""
Private. Validates the input flag
:param flag: input flag for validation
:return: is input flag valid as bool
"""
registered_flags: Flags | None = self.get_registered_flags()
if registered_flags:
if isinstance(registered_flags, Flag):
if registered_flags.get_string_entity() == flag.get_string_entity():
is_valid = registered_flags.validate_input_flag_value(flag.get_value())
if is_valid:
return True
else:
for registered_flag in registered_flags:
if registered_flag.get_string_entity() == flag.get_string_entity():
is_valid = registered_flag.validate_input_flag_value(flag.get_value())
if is_valid:
return True
return False
def get_description(self) -> str:
"""
Private. Returns the description of the command
:return: the description of the command as str
"""
return self._description
registered_flags: Flags = self.registered_flags
for registered_flag in registered_flags:
if registered_flag.string_entity == flag.string_entity:
is_valid = registered_flag.validate_input_flag_value(flag.input_value)
if is_valid:
return ValidationStatus.VALID
else:
return ValidationStatus.INVALID
return ValidationStatus.UNDEFINED
class InputCommand(BaseCommand, Generic[InputCommandType]):
def __init__(self, trigger: str,
input_flags: InputFlag | InputFlags = None):
class InputCommand:
def __init__(self, trigger: str, *,
input_flags: InputFlag | InputFlags = DEFAULT_WITHOUT_INPUT_FLAGS):
"""
Private. The model of the input command, after parsing
:param trigger:the trigger of the command
:param input_flags: the input flags
:return: None
"""
super().__init__(trigger)
self._input_flags: InputFlags = input_flags if isinstance(input_flags, InputFlags) else InputFlags(input_flags) if isinstance(input_flags, InputFlag) else InputFlags()
self.trigger: str = trigger
self.input_flags: InputFlags = input_flags if isinstance(input_flags, InputFlags) else InputFlags([input_flags])
def _set_input_flags(self, input_flags: InputFlags) -> None:
"""
Private. Sets the input flags
:param input_flags: the input flags to set
:return: None
"""
self._input_flags = input_flags
def get_input_flags(self) -> InputFlags:
"""
Private. Returns the input flags
:return: the input flags as InputFlags
"""
return self._input_flags
@staticmethod
def parse(raw_command: str) -> InputCommandType:
@classmethod
def parse(cls, raw_command: str) -> Self:
"""
Private. Parse the raw input command
:param raw_command: raw input command
:return: model of the input command, after parsing as InputCommand
"""
if not raw_command:
trigger, input_flags = CommandParser(raw_command).parse_raw_command()
return cls(trigger=trigger, input_flags=input_flags)
class CommandParser:
def __init__(self, raw_command: str) -> None:
self.raw_command: str = raw_command
self._parsed_input_flags: InputFlags = InputFlags()
def parse_raw_command(self) -> ParseResult:
if not self.raw_command:
raise EmptyInputCommandException()
list_of_tokens = raw_command.split()
command = list_of_tokens.pop(0)
input_flags, crnt_flag_name, crnt_flag_val = self._parse_flags(self.raw_command.split()[1:])
input_flags: InputFlags = InputFlags()
current_flag_name, current_flag_value = None, None
for k, _ in enumerate(list_of_tokens):
if _.startswith('-'):
if current_flag_name or len(_) < 2 or len(_[:_.rfind('-')]) > 3:
raise UnprocessedInputFlagException()
current_flag_name = _
else:
if not current_flag_name:
raise UnprocessedInputFlagException()
current_flag_value = _
if current_flag_name:
if not len(list_of_tokens) == k+1:
if not list_of_tokens[k+1].startswith('-'):
continue
input_flag = InputFlag(name=current_flag_name[current_flag_name.rfind('-')+1:],
prefix=cast(Literal['-', '--', '---'],
current_flag_name[:current_flag_name.rfind('-')+1]),
value=current_flag_value)
all_flags = [flag.get_string_entity() for flag in input_flags.get_flags()]
if input_flag.get_string_entity() not in all_flags:
input_flags.add_flag(input_flag)
else:
raise RepeatedInputFlagsException(input_flag)
current_flag_name, current_flag_value = None, None
if any([current_flag_name, current_flag_value]):
if any([crnt_flag_name, crnt_flag_val]):
raise UnprocessedInputFlagException()
else:
return InputCommand(trigger=command, input_flags=input_flags)
return (self.raw_command.split()[0], input_flags)
def _parse_flags(self, _tokens: list[str] | list[Never]) -> ParseFlagsResult:
crnt_flg_name, crnt_flg_val = None, None
for index, token in enumerate(_tokens):
crnt_flg_name, crnt_flg_val = _parse_single_token(token, crnt_flg_name, crnt_flg_val)
if not crnt_flg_name or self._is_next_token_value(index, _tokens):
continue
input_flag = InputFlag(
name=crnt_flg_name[crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1:],
prefix=cast(
Literal["-", "--", "---"],
crnt_flg_name[:crnt_flg_name.rfind(MIN_FLAG_PREFIX) + 1],
),
input_value=crnt_flg_val,
status=None
)
if input_flag in self._parsed_input_flags:
raise RepeatedInputFlagsException(input_flag)
self._parsed_input_flags.add_flag(input_flag)
crnt_flg_name, crnt_flg_val = None, None
return (self._parsed_input_flags, crnt_flg_name, crnt_flg_val)
def _is_next_token_value(self, current_index: int,
_tokens: list[str] | list[Never]) -> bool:
next_index = current_index + 1
if next_index >= len(_tokens):
return False
next_token = _tokens[next_index]
return not next_token.startswith(MIN_FLAG_PREFIX)
def _parse_single_token(
token: str,
crnt_flag_name: str | None,
crnt_flag_val: str | None
) -> tuple[str | None, str | None]:
if not token.startswith(MIN_FLAG_PREFIX):
if not crnt_flag_name or crnt_flag_val:
raise UnprocessedInputFlagException
return crnt_flag_name, token
prefix = token[:token.rfind(MIN_FLAG_PREFIX)]
if len(token) < 2 or len(prefix) > 2:
raise UnprocessedInputFlagException
new_flag_name = token
new_flag_value = None
return new_flag_name, new_flag_value
+4
View File
@@ -0,0 +1,4 @@
__all__ = ["get_time_of_pre_cycle_setup"]
from argenta.metrics.main import get_time_of_pre_cycle_setup
+18
View File
@@ -0,0 +1,18 @@
import io
from contextlib import redirect_stdout
from time import time
from argenta import App
def get_time_of_pre_cycle_setup(app: App) -> float:
"""
Public. Return time of pre cycle setup
:param app: app instance for testing time of pre cycle setup
:return: time of pre cycle setup as float
"""
start = time()
with redirect_stdout(io.StringIO()):
app._pre_cycle_setup() # pyright: ignore[reportPrivateUsage]
end = time()
return end - start
+5 -1
View File
@@ -1,4 +1,8 @@
__all__ = ["Orchestrator"]
__all__ = [
"Orchestrator",
"ArgParser"
]
from argenta.orchestrator.entity import Orchestrator
from argenta.orchestrator.argparser.entity import ArgParser
+10 -2
View File
@@ -1,4 +1,12 @@
__all__ = ["ArgParser"]
__all__ = [
"ArgParser",
"PositionalArgument",
"OptionalArgument",
"BooleanArgument"
]
from argenta.orchestrator.argparser.entity import ArgParser
from argenta.orchestrator.argparser.entity import ArgParser
from argenta.orchestrator.argparser.arguments import (BooleanArgument,
PositionalArgument,
OptionalArgument)
@@ -1,6 +1,8 @@
__all__ = ["BooleanArgument", "PositionalArgument", "OptionalArgument"]
from argenta.orchestrator.argparser.arguments.models import (BooleanArgument,
PositionalArgument,
OptionalArgument)
from argenta.orchestrator.argparser.arguments.models import (
BooleanArgument,
PositionalArgument,
OptionalArgument,
)
@@ -1,18 +1,19 @@
from abc import ABC, abstractmethod
from typing import Literal
from typing import Literal, override
class BaseArgument(ABC):
"""
Private. Base class for all arguments
"""
@property
@abstractmethod
def get_string_entity(self) -> str:
def string_entity(self) -> str:
"""
Public. Returns the string representation of the argument
:return: the string representation as a str
"""
pass
raise NotImplementedError
class PositionalArgument(BaseArgument):
@@ -21,35 +22,41 @@ class PositionalArgument(BaseArgument):
Public. Required argument at startup
:param name: name of the argument, must not start with minus (-)
"""
self.name = name
self.name: str = name
def get_string_entity(self):
@property
@override
def string_entity(self) -> str:
return self.name
class OptionalArgument(BaseArgument):
def __init__(self, name: str, prefix: Literal['-', '--', '---'] = '--'):
def __init__(self, name: str, prefix: Literal["-", "--", "---"] = "--"):
"""
Public. Optional argument, must have the value
:param name: name of the argument
:param prefix: prefix of the argument
"""
self.name = name
self.prefix = prefix
self.name: str = name
self.prefix: Literal["-", "--", "---"] = prefix
def get_string_entity(self):
@property
@override
def string_entity(self) -> str:
return self.prefix + self.name
class BooleanArgument(BaseArgument):
def __init__(self, name: str, prefix: Literal['-', '--', '---'] = '--'):
def __init__(self, name: str, prefix: Literal["-", "--", "---"] = "--"):
"""
Public. Boolean argument, does not require a value
:param name: name of the argument
:param prefix: prefix of the argument
"""
self.name = name
self.prefix = prefix
self.name: str = name
self.prefix: Literal["-", "--", "---"] = prefix
def get_string_entity(self):
@property
@override
def string_entity(self) -> str:
return self.prefix + self.name
+26 -36
View File
@@ -1,16 +1,20 @@
from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace
from argenta.orchestrator.argparser.arguments.models import (BooleanArgument,
OptionalArgument,
PositionalArgument)
from argenta.orchestrator.argparser.arguments.models import (
BooleanArgument,
OptionalArgument,
PositionalArgument,
)
class ArgParser:
def __init__(self,
processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument],
name: str = 'Argenta',
description: str = 'Argenta available arguments',
epilog: str = 'github.com/koloideal/Argenta | made by kolo') -> None:
def __init__(
self,
processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument], *,
name: str = "Argenta",
description: str = "Argenta available arguments",
epilog: str = "github.com/koloideal/Argenta | made by kolo",
) -> None:
"""
Public. Cmd argument parser and configurator at startup
:param name: the name of the ArgParse instance
@@ -18,32 +22,18 @@ class ArgParser:
:param epilog: the epilog of the ArgParse instance
:param processed_args: registered and processed arguments
"""
self.name = name
self.description = description
self.epilog = epilog
self._name: str = name
self._description: str = description
self._epilog: str = epilog
self.entity: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
self.args: list[PositionalArgument | OptionalArgument | BooleanArgument] | None = processed_args
self._entity: ArgumentParser = ArgumentParser(prog=name, description=description, epilog=epilog)
self._processed_args: list[PositionalArgument | OptionalArgument | BooleanArgument] = processed_args
for arg in processed_args:
if isinstance(arg, PositionalArgument) or isinstance(arg, OptionalArgument):
_ = self._entity.add_argument(arg.string_entity)
else:
_ = self._entity.add_argument(arg.string_entity, action="store_true")
def set_args(self, *args: PositionalArgument | OptionalArgument | BooleanArgument) -> None:
"""
Public. Sets the arguments to be processed
:param args: processed arguments
:return: None
"""
self.args.extend(args)
def register_args(self) -> None:
"""
Private. Registers initialized command line arguments
:return: None
"""
if not self.args:
return
for arg in self.args:
if type(arg) is PositionalArgument:
self.entity.add_argument(arg.get_string_entity())
elif type(arg) is OptionalArgument:
self.entity.add_argument(arg.get_string_entity())
elif type(arg) is BooleanArgument:
self.entity.add_argument(arg.get_string_entity(), action='store_true')
def parse_args(self) -> Namespace:
return self._entity.parse_args()
+5 -9
View File
@@ -5,18 +5,15 @@ from argenta.orchestrator.argparser import ArgParser
class Orchestrator:
def __init__(self, arg_parser: ArgParser = False):
def __init__(self, arg_parser: ArgParser | None = None):
"""
Public. An orchestrator and configurator that defines the behavior of an integrated system, one level higher than the App
:param arg_parser: Cmd argument parser and configurator at startup
:return: None
"""
self.arg_parser: ArgParser | False = arg_parser
if arg_parser:
self.arg_parser.register_args()
self._arg_parser: ArgParser | None = arg_parser
@staticmethod
def start_polling(app: App) -> None:
def start_polling(self, app: App) -> None:
"""
Public. Starting the user input processing cycle
:param app: a running application
@@ -29,8 +26,7 @@ class Orchestrator:
Public. Returns the arguments parsed
:return: None
"""
if self.arg_parser:
return self.arg_parser.entity.parse_args()
if self._arg_parser:
return self._arg_parser.parse_args()
else:
return None
+5
View File
@@ -0,0 +1,5 @@
__all__ = ["Response", "ResponseStatus"]
from argenta.response.entity import Response
from argenta.response.status import ResponseStatus
+23
View File
@@ -0,0 +1,23 @@
from typing import Literal
from argenta.command.flag.flags.models import InputFlags
from argenta.response.status import ResponseStatus
EMPTY_INPUT_FLAGS: InputFlags = InputFlags()
class Response:
__slots__: tuple[Literal['status', 'input_flags'], ...] = ("status", "input_flags")
def __init__(
self,
status: ResponseStatus,
input_flags: InputFlags = EMPTY_INPUT_FLAGS,
):
"""
Public. The entity of the user input sent to the handler
:param status: the status of the response
:param input_flags: all input flags
"""
self.status: ResponseStatus = status
self.input_flags: InputFlags = input_flags
+19
View File
@@ -0,0 +1,19 @@
from enum import Enum
class ResponseStatus(Enum):
ALL_FLAGS_VALID = "ALL_FLAGS_VALID"
UNDEFINED_FLAGS = "UNDEFINED_FLAGS"
INVALID_VALUE_FLAGS = "INVALID_VALUE_FLAGS"
UNDEFINED_AND_INVALID_FLAGS = "UNDEFINED_AND_INVALID_FLAGS"
@classmethod
def from_flags(cls, *, has_invalid_value_flags: bool, has_undefined_flags: bool) -> 'ResponseStatus':
key = (has_invalid_value_flags, has_undefined_flags)
status_map: dict[tuple[bool, bool], ResponseStatus] = {
(True, True): cls.UNDEFINED_AND_INVALID_FLAGS,
(True, False): cls.INVALID_VALUE_FLAGS,
(False, True): cls.UNDEFINED_FLAGS,
(False, False): cls.ALL_FLAGS_VALID,
}
return status_map[key]
+3 -3
View File
@@ -1,4 +1,4 @@
from argenta.router.entity import Router
__all__ = ["Router"]
from src.argenta.router.entity import Router
+14 -46
View File
@@ -1,62 +1,38 @@
from collections.abc import Iterator
from typing import Callable
from argenta.command import Command
from argenta.command.flag import InputFlags
from argenta.response import Response
class CommandHandler:
def __init__(self, handler: Callable[[], None] | Callable[[InputFlags], None], handled_command: Command):
def __init__(self, handler_as_func: Callable[[Response], None], handled_command: Command):
"""
Private. Entity of the model linking the handler and the command being processed
:param handler: the handler being called
:param handled_command: the command being processed
"""
self._handler = handler
self._handled_command = handled_command
self.handler_as_func: Callable[[Response], None] = handler_as_func
self.handled_command: Command = handled_command
def handling(self, input_flags: InputFlags = None) -> None:
def handling(self, response: Response) -> None:
"""
Private. Direct processing of an input command
:param input_flags: the flags of the input command
:param response: the entity of response: various groups of flags and status of response
:return: None
"""
if input_flags is not None:
self._handler(input_flags)
else:
self._handler()
def get_handler(self) -> Callable[[], None] | Callable[[InputFlags], None]:
"""
Private. Returns the handler being called
:return: the handler being called as Callable[[], None] or Callable[[InputFlags], None]
"""
return self._handler
def get_handled_command(self) -> Command:
"""
Private. Returns the command being processed
:return: the command being processed as Command
"""
return self._handled_command
self.handler_as_func(response)
class CommandHandlers:
def __init__(self, command_handlers: list[CommandHandler] = None):
def __init__(self, command_handlers: list[CommandHandler] | None = None):
"""
Private. The model that unites all CommandHandler of the routers
:param command_handlers: list of CommandHandlers for register
"""
self.command_handlers = command_handlers if command_handlers else []
self.command_handlers: list[CommandHandler] = command_handlers if command_handlers else []
def get_command_handlers(self) -> list[CommandHandler]:
"""
Private. Returns the list of CommandHandlers
:return: the list of CommandHandlers as list[CommandHandler]
"""
return self.command_handlers
def add_command_handler(self, command_handler: CommandHandler) -> None:
def add_handler(self, command_handler: CommandHandler) -> None:
"""
Private. Adds a CommandHandler to the list of CommandHandlers
:param command_handler: CommandHandler to be added
@@ -64,16 +40,8 @@ class CommandHandlers:
"""
self.command_handlers.append(command_handler)
def add_command_handlers(self, *command_handlers: CommandHandler) -> None:
"""
Private. Extend a many CommandHandler to the list of CommandHandlers
:param command_handlers: many CommandHandler to be added
:return: None
"""
self.command_handlers.extend(command_handlers)
def __iter__(self):
def __iter__(self) -> Iterator[CommandHandler]:
return iter(self.command_handlers)
def __next__(self):
return next(iter(self.command_handlers))
def __next__(self) -> CommandHandler:
return next(iter(self.command_handlers))
+1 -1
View File
@@ -1,4 +1,4 @@
from argenta.router import Router
system_router = Router(title='System points:')
system_router = Router(title="System points:")
+154 -146
View File
@@ -1,57 +1,67 @@
from typing import Callable
from inspect import getfullargspec
from src.argenta.command import Command
from src.argenta.command.models import InputCommand
from src.argenta.router.command_handler.entity import CommandHandlers, CommandHandler
from src.argenta.command.flag.models import Flag, Flags, InputFlags
from src.argenta.router.exceptions import (RepeatedFlagNameException,
TooManyTransferredArgsException,
RequiredArgumentNotPassedException,
TriggerContainSpacesException)
from typing import Callable, TypeAlias
from inspect import getfullargspec, get_annotations, getsourcefile, getsourcelines
from rich.console import Console
from argenta.command import Command, InputCommand
from argenta.command.flag import ValidationStatus
from argenta.response import Response, ResponseStatus
from argenta.router.command_handler.entity import CommandHandlers, CommandHandler
from argenta.command.flag.flags import (
Flags,
InputFlags
)
from argenta.router.exceptions import (
RepeatedFlagNameException,
TooManyTransferredArgsException,
RequiredArgumentNotPassedException,
TriggerContainSpacesException,
)
HandlerFunc: TypeAlias = Callable[[Response], None]
class Router:
def __init__(self,
title: str = None):
def __init__(
self, *, title: str | None = "Default title",
disable_redirect_stdout: bool = False
):
"""
Public. Directly configures and manages handlers
:param title: the title of the router, displayed when displaying the available commands
:param disable_redirect_stdout: Disables stdout forwarding, if the argument value is True,
the StaticDividingLine will be forced to be used as a line separator for this router,
disabled forwarding is needed when there is text output in conjunction with a text input request (for example, input()),
if the argument value is True, the output of the input() prompt is intercepted and not displayed,
which is ambiguous behavior and can lead to unexpected work
:return: None
"""
self._title = title
self.title: str | None = title
self.disable_redirect_stdout: bool = disable_redirect_stdout
self._command_handlers: CommandHandlers = CommandHandlers()
self._ignore_command_register: bool = False
self._not_valid_flag_handler: Callable[[Flag], None] = lambda flag: print(f"Undefined or incorrect input flag: {flag.get_string_entity()}{(' '+flag.get_value()) if flag.get_value() else ''}")
self.command_handlers: CommandHandlers = CommandHandlers()
self.command_register_ignore: bool = False
def command(self, command: Command) -> Callable:
def command(self, command: Command | str) -> Callable[[HandlerFunc], HandlerFunc]:
"""
Public. Registers handler
:param command: Registered command
:return: decorated handler as Callable[[Any], Any]
:return: decorated handler as Callable
"""
self._validate_command(command)
if isinstance(command, str):
redefined_command = Command(command)
else:
redefined_command = command
def command_decorator(func):
Router._validate_func_args(command, func)
self._command_handlers.add_command_handler(CommandHandler(func, command))
_validate_command(redefined_command)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
def decorator(func: HandlerFunc) -> HandlerFunc:
_validate_func_args(func)
self.command_handlers.add_handler(CommandHandler(func, redefined_command))
return command_decorator
def set_invalid_input_flag_handler(self, func: Callable[[Flag], None]) -> None:
"""
Public. Registers handler for invalid input flag
:param func: registered handler
:return: None
"""
self._not_valid_flag_handler = func
return func
return decorator
def finds_appropriate_handler(self, input_command: InputCommand) -> None:
"""
@@ -59,146 +69,144 @@ class Router:
:param input_command: input command as InputCommand
:return: None
"""
input_command_name: str = input_command.get_trigger()
input_command_flags: InputFlags = input_command.get_input_flags()
input_command_name: str = input_command.trigger
input_command_flags: InputFlags = input_command.input_flags
for command_handler in self._command_handlers:
handle_command = command_handler.get_handled_command()
if input_command_name.lower() == handle_command.get_trigger().lower():
for command_handler in self.command_handlers:
handle_command = command_handler.handled_command
if input_command_name.lower() == handle_command.trigger.lower():
self.process_input_command(input_command_flags, command_handler)
if input_command_name.lower() in handle_command.get_aliases():
if input_command_name.lower() in handle_command.aliases:
self.process_input_command(input_command_flags, command_handler)
def process_input_command(self, input_command_flags: InputFlags, command_handler: CommandHandler) -> None:
def process_input_command(
self, input_command_flags: InputFlags, command_handler: CommandHandler
) -> None:
"""
Private. Processes input command with the appropriate handler
:param input_command_flags: input command flags as InputFlags
:param command_handler: command handler for input command as CommandHandler
:return: None
"""
handle_command = command_handler.get_handled_command()
if handle_command.get_registered_flags().get_flags():
if input_command_flags.get_flags():
if self._validate_input_flags(handle_command, input_command_flags):
command_handler.handling(input_command_flags)
return
handle_command = command_handler.handled_command
if handle_command.registered_flags.flags:
if input_command_flags.flags:
response: Response = _structuring_input_flags(handle_command, input_command_flags)
command_handler.handling(response)
else:
command_handler.handling(input_command_flags)
return
response = Response(ResponseStatus.ALL_FLAGS_VALID)
command_handler.handling(response)
else:
if input_command_flags.get_flags():
self._not_valid_flag_handler(input_command_flags[0])
return
if input_command_flags.flags:
undefined_flags = InputFlags()
for input_flag in input_command_flags:
input_flag.status = ValidationStatus.UNDEFINED
undefined_flags.add_flag(input_flag)
response = Response(ResponseStatus.UNDEFINED_FLAGS, input_flags=undefined_flags)
command_handler.handling(response)
else:
command_handler.handling()
return
response = Response(ResponseStatus.ALL_FLAGS_VALID)
command_handler.handling(response)
def _validate_input_flags(self, handled_command: Command, input_flags: InputFlags) -> bool:
"""
Private. Validates flags of input command
:param handled_command: entity of the handled command
:param input_flags:
:return: is flags of input command valid as bool
"""
for flag in input_flags:
is_valid: bool = handled_command.validate_input_flag(flag)
if not is_valid:
self._not_valid_flag_handler(flag)
return False
return True
@staticmethod
def _validate_command(command: Command) -> None:
"""
Private. Validates the command registered in handler
:param command: validated command
:return: None if command is valid else raise exception
"""
command_name: str = command.get_trigger()
if command_name.find(' ') != -1:
raise TriggerContainSpacesException()
flags: Flags = command.get_registered_flags()
if flags:
flags_name: list = [x.get_string_entity().lower() for x in flags]
if len(set(flags_name)) < len(flags_name):
raise RepeatedFlagNameException()
@staticmethod
def _validate_func_args(command: Command, func: Callable) -> None:
"""
Private. Validates the arguments of the handler
:param command: registered command in handler
:param func: entity of the handler func
:return: None if func is valid else raise exception
"""
registered_args = command.get_registered_flags()
transferred_args = getfullargspec(func).args
if registered_args.get_flags() and transferred_args:
if len(transferred_args) != 1:
raise TooManyTransferredArgsException()
elif registered_args.get_flags() and not transferred_args:
raise RequiredArgumentNotPassedException()
elif not registered_args.get_flags() and transferred_args:
raise TooManyTransferredArgsException()
def set_command_register_ignore(self, _: bool) -> None:
"""
Private. Sets the router behavior on the input commands register
:param _: is command register ignore
:return: None
"""
self._ignore_command_register = _
def get_triggers(self) -> list[str]:
@property
def triggers(self) -> list[str]:
"""
Public. Gets registered triggers
:return: registered in router triggers as list[str]
"""
all_triggers: list[str] = []
for command_handler in self._command_handlers:
all_triggers.append(command_handler.get_handled_command().get_trigger())
for command_handler in self.command_handlers:
all_triggers.append(command_handler.handled_command.trigger)
return all_triggers
def get_aliases(self) -> list[str]:
@property
def aliases(self) -> list[str]:
"""
Public. Gets registered aliases
:return: registered in router aliases as list[str]
"""
all_aliases: list[str] = []
for command_handler in self._command_handlers:
if command_handler.get_handled_command().get_aliases():
all_aliases.extend(command_handler.get_handled_command().get_aliases())
for command_handler in self.command_handlers:
if command_handler.handled_command.aliases:
all_aliases.extend(command_handler.handled_command.aliases)
return all_aliases
def get_command_handlers(self) -> CommandHandlers:
"""
Private. Gets registered command handlers
:return: registered command handlers as CommandHandlers
"""
return self._command_handlers
class CommandDecorator:
def __init__(self, router_instance: Router, command: Command):
self.router: Router = router_instance
self.command: Command = command
def __call__(self, handler_func: Callable[[Response], None]) -> Callable[[Response], None]:
_validate_func_args(handler_func)
self.router.command_handlers.add_handler(CommandHandler(handler_func, self.command))
return handler_func
def get_title(self) -> str | None:
"""
Public. Gets title of the router
:return: the title of the router as str or None
"""
return self._title
def _structuring_input_flags(handled_command: Command,
input_flags: InputFlags) -> Response:
"""
Private. Validates flags of input command
:param handled_command: entity of the handled command
:param input_flags:
:return: entity of response as Response
"""
invalid_value_flags, undefined_flags = False, False
for flag in input_flags:
flag_status: ValidationStatus = (handled_command.validate_input_flag(flag))
flag.status = flag_status
if flag_status == ValidationStatus.INVALID:
invalid_value_flags = True
elif flag_status == ValidationStatus.UNDEFINED:
undefined_flags = True
status = ResponseStatus.from_flags(has_invalid_value_flags=invalid_value_flags,
has_undefined_flags=undefined_flags)
return Response(
status=status,
input_flags=input_flags
)
def _validate_func_args(func: Callable[[Response], None]) -> None:
"""
Private. Validates the arguments of the handler
:param func: entity of the handler func
:return: None if func is valid else raise exception
"""
transferred_args = getfullargspec(func).args
if len(transferred_args) > 1:
raise TooManyTransferredArgsException()
elif len(transferred_args) == 0:
raise RequiredArgumentNotPassedException()
transferred_arg: str = transferred_args[0]
func_annotations: dict[str, None] = get_annotations(func)
arg_annotation = func_annotations.get(transferred_arg)
if arg_annotation is not None:
if arg_annotation is not Response:
source_line: int = getsourcelines(func)[1]
Console().print(
f'\nFile "{getsourcefile(func)}", line {source_line}\n[b red]WARNING:[/b red] [i]The typehint ' +
f"of argument([green]{transferred_arg}[/green]) passed to the handler must be [/i][bold blue]{Response}[/bold blue]," +
f" [i]but[/i] [bold blue]{arg_annotation}[/bold blue] [i]is specified[/i]",
highlight=False,
)
def set_title(self, title: str) -> None:
"""
Public. Sets the title of the router
:param title: title that will be setted
:return: None
"""
self._title = title
def _validate_command(command: Command) -> None:
"""
Private. Validates the command registered in handler
:param command: validated command
:return: None if command is valid else raise exception
"""
command_name: str = command.trigger
if command_name.find(" ") != -1:
raise TriggerContainSpacesException()
flags: Flags = command.registered_flags
flags_name: list[str] = [flag.string_entity.lower() for flag in flags]
if len(set(flags_name)) < len(flags_name):
raise RepeatedFlagNameException()
+11 -4
View File
@@ -1,8 +1,12 @@
from typing import override
class RepeatedFlagNameException(Exception):
"""
Private. Raised when a repeated flag name is registered
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Repeated registered flag names in register command"
@@ -10,7 +14,8 @@ class TooManyTransferredArgsException(Exception):
"""
Private. Raised when too many arguments are passed
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Too many transferred arguments"
@@ -18,7 +23,8 @@ class RequiredArgumentNotPassedException(Exception):
"""
Private. Raised when a required argument is not passed
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Required argument not passed"
@@ -26,5 +32,6 @@ class TriggerContainSpacesException(Exception):
"""
Private. Raised when there is a space in the trigger being registered
"""
def __str__(self):
@override
def __str__(self) -> str:
return "Command trigger cannot contain spaces"
@@ -1,32 +1,35 @@
import _io
from unittest.mock import patch, MagicMock
import unittest
from unittest import TestCase
import io
import re
from argenta.app import App
from argenta.command import Command
from argenta.command import Command, PredefinedFlags
from argenta.command.flag.models import ValidationStatus
from argenta.router import Router
from argenta.command.flag.models import Flags, InputFlags
from argenta.command.flag.defaults import PredefinedFlags
from argenta.command.flag.flags.models import Flags
from argenta.orchestrator import Orchestrator
from argenta.response import Response
class TestSystemHandlerNormalWork(unittest.TestCase):
class TestSystemHandlerNormalWork(TestCase):
@patch("builtins.input", side_effect=["help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}'))
app.run_polling()
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -37,17 +40,18 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_incorrect_command2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(ignore_command_register=False,
override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}'))
app.run_polling()
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -58,74 +62,86 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
print(f'test command')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
undefined_flag = response.input_flags.get_flag_by_name('help')
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'test command with undefined flag: {undefined_flag.string_entity}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn('\nUndefined or incorrect input flag: --help\n', output)
self.assertIn('\ntest command with undefined flag: --help\n', output)
@patch("builtins.input", side_effect=["test --port 22", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
print('test command')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
undefined_flag = response.input_flags.get_flag_by_name("port")
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'test command with undefined flag with value: {undefined_flag.string_entity} {undefined_flag.input_value}')
else:
raise
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn('\nUndefined or incorrect input flag: --port 22\n', output)
self.assertIn('\ntest command with undefined flag with value: --port 22\n', output)
@patch("builtins.input", side_effect=["test --host 192.168.32.1 --port 132", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_one_correct_flag_an_one_incorrect_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
flags = Flags(PredefinedFlags.HOST)
orchestrator = Orchestrator()
flags = Flags([PredefinedFlags.HOST])
@router.command(Command('test', flags=flags))
def test(args: InputFlags):
print(f'connecting to host {args.get_flag('host').get_value()}')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
undefined_flag = response.input_flags.get_flag_by_name("port")
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'connecting to host with flag: {undefined_flag.string_entity} {undefined_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn('\nUndefined or incorrect input flag: --port 132\n', output)
self.assertIn('\nconnecting to host with flag: --port 132\n', output)
@patch("builtins.input", side_effect=["test", "some", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_one_correct_command_and_one_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}'))
app.run_polling()
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -136,20 +152,21 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_two_correct_commands_and_one_incorrect_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('more'))
def test():
def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'more command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.get_trigger()}'))
app.run_polling()
app.set_unknown_command_handler(lambda command: print(f'Unknown command: {command.trigger}'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -160,16 +177,17 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_incorrect_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_invalid_input_flags_handler(lambda command: print(f'Incorrect flag syntax: "{command}"'))
app.run_polling()
app.set_incorrect_input_syntax_handler(lambda command: print(f'Incorrect flag syntax: "{command}"'))
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -180,16 +198,17 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_empty_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_empty_command_handler(lambda: print('Empty input command'))
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -200,17 +219,39 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_repeated_flags(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test', flags=PredefinedFlags.PORT))
def test(args: InputFlags):
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.set_repeated_input_flags_handler(lambda command: print(f'Repeated input flags: "{command}"'))
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn("\nRepeated input flags: \"test --port 22 --port 33\"\n", output)
self.assertIn('Repeated input flags: "test --port 22 --port 33"', output)
@patch("builtins.input", side_effect=["test --help", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_unregistered_flag3(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
undefined_flag = response.input_flags.get_flag_by_name('help')
if undefined_flag and undefined_flag.status == ValidationStatus.UNDEFINED:
print(f'test command with undefined flag: {undefined_flag.string_entity}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
self.assertIn('\ntest command with undefined flag: --help\n', output)
@@ -1,31 +1,35 @@
import _io
from unittest.mock import patch, MagicMock
import unittest
from unittest import TestCase
import io
import re
from argenta.app import App
from argenta.command.models import Command
from argenta.command import Command, PredefinedFlags
from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.response import Response
from argenta.router import Router
from argenta.command.flag.models import Flag, Flags, InputFlags
from argenta.command.flag.defaults import PredefinedFlags
from argenta.orchestrator import Orchestrator
from argenta.command.flag import Flag
from argenta.command.flag.flags import Flags
class TestSystemHandlerNormalWork(unittest.TestCase):
class TestSystemHandlerNormalWork(TestCase):
@patch("builtins.input", side_effect=["test", "q"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -36,16 +40,17 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print('test command')
app = App(ignore_command_register=True,
override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -56,16 +61,19 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_custom_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
flag = Flag('help', '--', False)
orchestrator = Orchestrator()
flag = Flag('help', prefix='--', possible_values=PossibleValues.NEITHER)
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
print(f'\nhelp for {args.get_flag('help').get_name()} flag\n')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
valid_flag = response.input_flags.get_flag_by_name('help')
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'\nhelp for {valid_flag.name} flag\n')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -75,16 +83,19 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_custom_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
flag = Flag('port', '--', re.compile(r'^\d{1,5}$'))
orchestrator = Orchestrator()
flag = Flag('port', prefix='--', possible_values=re.compile(r'^\d{1,5}$'))
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
print(f'flag value for {args.get_flag('port').get_name()} flag : {args.get_flag('port').get_value()}')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
valid_flag = response.input_flags.get_flag_by_name('port')
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'flag value for {valid_flag.name} flag : {valid_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -95,16 +106,19 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.SHORT_HELP
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
print(f'help for {args.get_flag('H').get_name()} flag')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
valid_flag = response.input_flags.get_flag_by_name('H')
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'help for {valid_flag.name} flag')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -115,17 +129,19 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag2(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.INFO
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
if args.get_flag('info'):
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
valid_flag = response.input_flags.get_flag_by_name('info')
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print('info about test command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -136,16 +152,19 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_default_flag3(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
flag = PredefinedFlags.HOST
@router.command(Command('test', flags=flag))
def test(args: InputFlags):
print(f'connecting to host {args[0].get_value()}')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
valid_flag = response.input_flags.get_flag_by_name('host')
if valid_flag and valid_flag.status == ValidationStatus.VALID:
print(f'connecting to host {valid_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -156,16 +175,20 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_correct_command_with_two_flags(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
flags = Flags(PredefinedFlags.HOST, PredefinedFlags.PORT)
orchestrator = Orchestrator()
flags = Flags([PredefinedFlags.HOST, PredefinedFlags.PORT])
@router.command(Command('test', flags=flags))
def test(args: InputFlags):
print(f'connecting to host {args[0].get_value()} and port {args[1].get_value()}')
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
host_flag = response.input_flags.get_flag_by_name('host')
port_flag = response.input_flags.get_flag_by_name('port')
if (host_flag and host_flag.status == ValidationStatus.VALID) and (port_flag and port_flag.status == ValidationStatus.VALID):
print(f'connecting to host {host_flag.input_value} and port {port_flag.input_value}')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -176,19 +199,20 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_two_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('some'))
def test2():
def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'some command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
@@ -199,23 +223,24 @@ class TestSystemHandlerNormalWork(unittest.TestCase):
@patch("sys.stdout", new_callable=io.StringIO)
def test_input_three_correct_command(self, mock_stdout: _io.StringIO, magick_mock: MagicMock):
router = Router()
orchestrator = Orchestrator()
@router.command(Command('test'))
def test():
def test(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'test command')
@router.command(Command('some'))
def test():
def test1(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'some command')
@router.command(Command('more'))
def test():
def test2(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
print(f'more command')
app = App(override_system_messages=True,
print_func=print)
app.include_router(router)
app.run_polling()
orchestrator.start_polling(app)
output = mock_stdout.getvalue()
+75
View File
@@ -0,0 +1,75 @@
from argenta.command.models import InputCommand, Command
from argenta.app import App
import unittest
from argenta.router import Router
class MyTestCase(unittest.TestCase):
def test_is_exit_command1(self):
app = App()
self.assertEqual(app._is_exit_command(InputCommand('q')), True)
def test_is_exit_command5(self):
app = App()
self.assertEqual(app._is_exit_command(InputCommand('Q')), True)
def test_is_exit_command2(self):
app = App(ignore_command_register=False)
self.assertEqual(app._is_exit_command(InputCommand('q')), False)
def test_is_exit_command3(self):
app = App(exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('quit')), True)
def test_is_exit_command4(self):
app = App(exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('qUIt')), True)
def test_is_exit_command6(self):
app = App(ignore_command_register=False,
exit_command=Command('quit'))
self.assertEqual(app._is_exit_command(InputCommand('qUIt')), False)
def test_is_unknown_command1(self):
app = App()
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('fr')), False)
def test_is_unknown_command2(self):
app = App()
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'fr': Router(), 'tr': Router(), 'de': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('cr')), True)
def test_is_unknown_command3(self):
app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('pr')), True)
def test_is_unknown_command4(self):
app = App(ignore_command_register=False)
app.set_unknown_command_handler(lambda command: None)
app._current_matching_triggers_with_routers = {'Pr': Router(), 'tW': Router(), 'deQW': Router()}
self.assertEqual(app._is_unknown_command(InputCommand('tW')), False)
+39 -8
View File
@@ -1,15 +1,18 @@
from argenta.command.flag import Flag, InputFlag, Flags
from argenta.command.models import InputCommand, Command
from argenta.command.flag import Flag, InputFlag
from argenta.command.flag.flags import Flags
from argenta.command.flag.models import PossibleValues
from argenta.command.models import InputCommand, Command, ValidationStatus
from argenta.command.exceptions import (UnprocessedInputFlagException,
RepeatedInputFlagsException,
EmptyInputCommandException)
import unittest
import re
class TestInputCommand(unittest.TestCase):
def test_parse_correct_raw_command(self):
self.assertEqual(InputCommand.parse('ssh --host 192.168.0.3').get_trigger(), 'ssh')
self.assertEqual(InputCommand.parse('ssh --host 192.168.0.3').trigger, 'ssh')
def test_parse_raw_command_without_flag_name_with_value(self):
with self.assertRaises(UnprocessedInputFlagException):
@@ -23,11 +26,39 @@ class TestInputCommand(unittest.TestCase):
with self.assertRaises(EmptyInputCommandException):
InputCommand.parse('')
def test_validate_correct_input_flag1(self):
def test_validate_valid_input_flag1(self):
command = Command('some', flags=Flag('test'))
self.assertEqual(command.validate_input_flag(InputFlag('test')), True)
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value=None, status=None)), ValidationStatus.VALID)
def test_validate_correct_input_flag2(self):
command = Command('some', flags=Flags(Flag('test'), Flag('more')))
self.assertEqual(command.validate_input_flag(InputFlag('more')), True)
def test_validate_valid_input_flag2(self):
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
self.assertEqual(command.validate_input_flag(InputFlag('more', input_value=None, status=None)), ValidationStatus.VALID)
def test_validate_undefined_input_flag1(self):
command = Command('some', flags=Flag('test'))
self.assertEqual(command.validate_input_flag(InputFlag('more', input_value=None, status=None)), ValidationStatus.UNDEFINED)
def test_validate_undefined_input_flag2(self):
command = Command('some', flags=Flags([Flag('test'), Flag('more')]))
self.assertEqual(command.validate_input_flag(InputFlag('case', input_value=None, status=None)), ValidationStatus.UNDEFINED)
def test_validate_undefined_input_flag3(self):
command = Command('some')
self.assertEqual(command.validate_input_flag(InputFlag('case', input_value=None, status=None)), ValidationStatus.UNDEFINED)
def test_invalid_input_flag1(self):
command = Command('some', flags=Flag('test', possible_values=PossibleValues.NEITHER))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID)
def test_invalid_input_flag2(self):
command = Command('some', flags=Flag('test', possible_values=['some', 'case']))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='slay', status=None)), ValidationStatus.INVALID)
def test_invalid_input_flag3(self):
command = Command('some', flags=Flag('test', possible_values=re.compile(r'^ex\d{, 2}op$')))
self.assertEqual(command.validate_input_flag(InputFlag('test', input_value='example', status=None)), ValidationStatus.INVALID)
def test_isinstance_parse_correct_raw_command(self):
cmd = InputCommand.parse('ssh --host 192.168.0.3')
self.assertIsInstance(cmd, InputCommand)
+2 -2
View File
@@ -6,11 +6,11 @@ import unittest
class TestDividingLine(unittest.TestCase):
def test_get_static_dividing_line_full_line(self):
line = StaticDividingLine('-')
self.assertEqual(line.get_full_static_line(True).count('-'), 25)
self.assertEqual(line.get_full_static_line(is_override=True).count('-'), 25)
def test_get_dynamic_dividing_line_full_line(self):
line = DynamicDividingLine()
self.assertEqual(line.get_full_dynamic_line(20, True).count('-'), 20)
self.assertEqual(line.get_full_dynamic_line(length=20, is_override=True).count('-'), 20)
def test_get_dividing_line_unit_part(self):
line = StaticDividingLine('')
+28 -28
View File
@@ -1,4 +1,5 @@
from argenta.command.flag.models import Flag, InputFlag, InputFlags, Flags
from argenta.command.flag import Flag, InputFlag, PossibleValues
from argenta.command.flag.flags import InputFlags, Flags
import unittest
import re
@@ -6,35 +7,34 @@ import re
class TestFlag(unittest.TestCase):
def test_get_string_entity(self):
self.assertEqual(Flag(name='test').get_string_entity(),
self.assertEqual(Flag(name='test').string_entity,
'--test')
def test_get_string_entity2(self):
self.assertEqual(Flag(name='test',
prefix='---').get_string_entity(),
prefix='---').string_entity,
'---test')
def test_get_flag_name(self):
self.assertEqual(Flag(name='test').get_name(),
self.assertEqual(Flag(name='test').name,
'test')
def test_get_flag_prefix(self):
self.assertEqual(Flag(name='test').get_prefix(),
self.assertEqual(Flag(name='test').prefix,
'--')
def test_get_flag_prefix2(self):
self.assertEqual(Flag(name='test',
prefix='--').get_prefix(),
prefix='--').prefix,
'--')
def test_get_flag_value_without_set(self):
self.assertEqual(InputFlag(name='test').get_value(),
self.assertEqual(InputFlag(name='test', input_value=None, status=None).input_value,
None)
def test_get_flag_value_with_set(self):
flag = InputFlag(name='test')
flag.set_value('example')
self.assertEqual(flag.get_value(), 'example')
flag = InputFlag(name='test', input_value='example', status=None)
self.assertEqual(flag.input_value, 'example')
def test_validate_incorrect_flag_value_with_list_of_possible_flag_values(self):
flag = Flag(name='test', possible_values=['1', '2', '3'])
@@ -53,37 +53,37 @@ class TestFlag(unittest.TestCase):
self.assertEqual(flag.validate_input_flag_value('192.168.9.8'), True)
def test_validate_correct_empty_flag_value_without_possible_flag_values(self):
flag = Flag(name='test', possible_values=False)
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value(None), True)
def test_validate_correct_empty_flag_value_with_possible_flag_values(self):
flag = Flag(name='test', possible_values=True)
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value(None), True)
def test_validate_incorrect_random_flag_value_without_possible_flag_values(self):
flag = Flag(name='test', possible_values=False)
flag = Flag(name='test', possible_values=PossibleValues.NEITHER)
self.assertEqual(flag.validate_input_flag_value('random value'), False)
def test_validate_correct_random_flag_value_with_possible_flag_values(self):
flag = Flag(name='test', possible_values=True)
flag = Flag(name='test', possible_values=PossibleValues.ALL)
self.assertEqual(flag.validate_input_flag_value('random value'), True)
def test_get_input_flag1(self):
flag = InputFlag(name='test')
input_flags = InputFlags(flag)
self.assertEqual(input_flags.get_flag('test'), flag)
flag = InputFlag(name='test', input_value=None, status=None)
input_flags = InputFlags([flag])
self.assertEqual(input_flags.get_flag_by_name('test'), flag)
def test_get_input_flag2(self):
flag = InputFlag(name='test')
flag2 = InputFlag(name='some')
input_flags = InputFlags(flag, flag2)
self.assertEqual(input_flags.get_flag('some'), flag2)
flag = InputFlag(name='test', input_value=None, status=None)
flag2 = InputFlag(name='some', input_value=None, status=None)
input_flags = InputFlags([flag, flag2])
self.assertEqual(input_flags.get_flag_by_name('some'), flag2)
def test_get_undefined_input_flag(self):
flag = InputFlag(name='test')
flag2 = InputFlag(name='some')
input_flags = InputFlags(flag, flag2)
self.assertEqual(input_flags.get_flag('case'), None)
flag = InputFlag(name='test', input_value=None, status=None)
flag2 = InputFlag(name='some', input_value=None, status=None)
input_flags = InputFlags([flag, flag2])
self.assertEqual(input_flags.get_flag_by_name('case'), None)
def test_get_flags(self):
flags = Flags()
@@ -93,18 +93,18 @@ class TestFlag(unittest.TestCase):
Flag('test3'),
]
flags.add_flags(list_of_flags)
self.assertEqual(flags.get_flags(),
self.assertEqual(flags.flags,
list_of_flags)
def test_add_flag(self):
flags = Flags()
flags.add_flag(Flag('test'))
self.assertEqual(len(flags.get_flags()), 1)
self.assertEqual(len(flags.flags), 1)
def test_add_flags(self):
flags = Flags()
flags.add_flags([Flag('test'), Flag('test2')])
self.assertEqual(len(flags.get_flags()), 2)
self.assertEqual(len(flags.flags), 2)
+58 -83
View File
@@ -1,138 +1,113 @@
from argenta.command.flag import InputFlags, InputFlag, Flag, Flags
from src.argenta.router import Router
from src.argenta.command import Command
from src.argenta.router.exceptions import (TriggerContainSpacesException,
RepeatedFlagNameException,
TooManyTransferredArgsException,
RequiredArgumentNotPassedException)
from argenta.command.flag import InputFlag, Flag
from argenta.command.flag.flags import Flags, InputFlags
from argenta.command.flag.models import PossibleValues, ValidationStatus
from argenta.response.entity import Response
from argenta.router import Router
from argenta.command import Command
from argenta.router.entity import _structuring_input_flags, _validate_command, _validate_func_args # pyright: ignore[reportPrivateUsage]
from argenta.router.exceptions import (TriggerContainSpacesException,
RepeatedFlagNameException,
TooManyTransferredArgsException,
RequiredArgumentNotPassedException)
import unittest
import re
class TestRouter(unittest.TestCase):
def test_get_router_title(self):
self.assertEqual(Router(title='test title').get_title(), 'test title')
def test_register_command_with_spaces_in_trigger(self):
router = Router()
with self.assertRaises(TriggerContainSpacesException):
router._validate_command(Command(trigger='command with spaces'))
_validate_command(Command(trigger='command with spaces'))
def test_register_command_with_repeated_flags(self):
router = Router()
with self.assertRaises(RepeatedFlagNameException):
router._validate_command(Command(trigger='command', flags=Flags(Flag('test'), Flag('test'))))
_validate_command(Command(trigger='command', flags=Flags([Flag('test'), Flag('test')])))
def test_validate_incorrect_input_flag1(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
self.assertEqual(router._validate_input_flags(Command('cmd'), InputFlags(InputFlag('ssh'))), False)
def test_structuring_input_flags1(self):
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.UNDEFINED)]))
def test_validate_incorrect_input_flag2(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
self.assertEqual(router._validate_input_flags(Command('cmd'), InputFlags(InputFlag('ssh', value='some'))), False)
def test_structuring_input_flags2(self):
cmd = Command('cmd')
input_flags = InputFlags([InputFlag('ssh', input_value='some', status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some', status=ValidationStatus.UNDEFINED)]))
def test_validate_incorrect_input_flag3(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags(InputFlag('ssh', value='some2'))
self.assertEqual(router._validate_input_flags(command, input_flags), False)
def test_structuring_input_flags3(self):
cmd = Command('cmd', flags=Flag('port'))
input_flags = InputFlags([InputFlag('ssh', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(cmd, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some2', status=ValidationStatus.UNDEFINED)]))
def test_validate_incorrect_input_flag4(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
command = Command('cmd', flags=Flag('ssh', possible_values=False))
input_flags = InputFlags(InputFlag('ssh', value='some3'))
self.assertEqual(router._validate_input_flags(command, input_flags), False)
def test_structuring_input_flags4(self):
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value='some3', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some3', status=ValidationStatus.INVALID)]))
def test_validate_incorrect_input_flag5(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
def test_structuring_input_flags5(self):
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'some[1-5]$')))
input_flags = InputFlags(InputFlag('ssh', value='some40'))
self.assertEqual(router._validate_input_flags(command, input_flags), False)
input_flags = InputFlags([InputFlag('ssh', input_value='some40', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='some40', status=ValidationStatus.INVALID)]))
def test_validate_incorrect_input_flag6(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
def test_structuring_input_flags6(self):
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags(InputFlag('ssh', value='example2'))
self.assertEqual(router._validate_input_flags(command, input_flags), False)
input_flags = InputFlags([InputFlag('ssh', input_value='example2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='example2', status=ValidationStatus.INVALID)]))
def test_validate_incorrect_input_flag7(self):
router = Router()
router.set_invalid_input_flag_handler(lambda flag: None)
command = Command('cmd', flags=Flag('ssh', possible_values=['example']))
input_flags = InputFlags(InputFlag('ssh'))
self.assertEqual(router._validate_input_flags(command, input_flags), False)
def test_validate_correct_input_flag1(self):
def test_structuring_input_flags7(self):
command = Command('cmd', flags=Flag('port'))
input_flags = InputFlags(InputFlag('port', value='some2'))
self.assertEqual(Router()._validate_input_flags(command, input_flags), True)
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)]))
def test_validate_correct_input_flag2(self):
def test_structuring_input_flags8(self):
command = Command('cmd', flags=Flag('port', possible_values=['some2', 'some3']))
input_flags = InputFlags(InputFlag('port', value='some2'))
self.assertEqual(Router()._validate_input_flags(command, input_flags), True)
input_flags = InputFlags([InputFlag('port', input_value='some2', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('port', input_value='some2', status=ValidationStatus.VALID)]))
def test_validate_correct_input_flag3(self):
def test_structuring_input_flags9(self):
command = Command('cmd', flags=Flag('ssh', possible_values=re.compile(r'more[1-5]$')))
input_flags = InputFlags(InputFlag('ssh', value='more5'))
self.assertEqual(Router()._validate_input_flags(command, input_flags), True)
input_flags = InputFlags([InputFlag('ssh', input_value='more5', status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value='more5', status=ValidationStatus.VALID)]))
def test_validate_correct_input_flag4(self):
command = Command('cmd', flags=Flag('ssh', possible_values=False))
input_flags = InputFlags(InputFlag('ssh'))
self.assertEqual(Router()._validate_input_flags(command, input_flags), True)
def test_structuring_input_flags10(self):
command = Command('cmd', flags=Flag('ssh', possible_values=PossibleValues.NEITHER))
input_flags = InputFlags([InputFlag('ssh', input_value=None, status=None)])
self.assertEqual(_structuring_input_flags(command, input_flags).input_flags, InputFlags([InputFlag('ssh', input_value=None, status=ValidationStatus.VALID)]))
def test_validate_incorrect_func_args1(self):
command = Command('cmd', flags=Flag('port'))
def handler():
pass
with self.assertRaises(RequiredArgumentNotPassedException):
Router()._validate_func_args(command, handler)
_validate_func_args(handler) # pyright: ignore[reportArgumentType]
def test_validate_incorrect_func_args2(self):
command = Command('cmd', flags=Flag('port'))
def handler(args, kwargs):
def handler(args, kwargs): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType]
pass
with self.assertRaises(TooManyTransferredArgsException):
Router()._validate_func_args(command, handler)
def test_validate_incorrect_func_args3(self):
command = Command('cmd')
def handler(args):
pass
with self.assertRaises(TooManyTransferredArgsException):
Router()._validate_func_args(command, handler)
_validate_func_args(handler) # pyright: ignore[reportArgumentType]
def test_get_router_aliases(self):
router = Router()
@router.command(Command('some', aliases=['test', 'case']))
def handler():
def handler(response: Response) -> None: # pyright: ignore[reportUnusedFunction]
pass
self.assertListEqual(router.get_aliases(), ['test', 'case'])
self.assertListEqual(router.aliases, ['test', 'case'])
def test_get_router_aliases2(self):
router = Router()
@router.command(Command('some', aliases=['test', 'case']))
def handler():
def handler(response: Response): # pyright: ignore[reportUnusedFunction]
pass
@router.command(Command('ext', aliases=['more', 'foo']))
def handler2():
def handler2(response: Response): # pyright: ignore[reportUnusedFunction]
pass
self.assertListEqual(router.get_aliases(), ['test', 'case', 'more', 'foo'])
self.assertListEqual(router.aliases, ['test', 'case', 'more', 'foo'])
def test_get_router_aliases3(self):
router = Router()
@router.command(Command('some'))
def handler():
def handler(response: Response): # pyright: ignore[reportUnusedFunction]
pass
self.assertListEqual(router.get_aliases(), [])
self.assertListEqual(router.aliases, [])