complete creating run command

This commit is contained in:
2026-02-08 22:37:42 +03:00
parent b732036e87
commit 70f1327a0d
2 changed files with 14 additions and 31 deletions
+10 -28
View File
@@ -1,10 +1,10 @@
__all__ = ["run_handler"]
import importlib
import sys
from pathlib import Path
from typing import Any
from argenta import App, Orchestrator
class ImportFromStringError(Exception):
pass
@@ -20,9 +20,7 @@ def import_from_string(import_str: str) -> Any:
try:
module = importlib.import_module(module_str)
except ModuleNotFoundError as exc:
if exc.name != module_str:
raise exc from None
raise ImportFromStringError(f'Could not import module "{module_str}".')
raise ImportFromStringError(f'Could not import module "{module_str}".') from exc
instance = module
try:
@@ -34,29 +32,13 @@ def import_from_string(import_str: str) -> Any:
return instance
def run_handler(orchestrator_str: str, app_str: str | None = None) -> Any:
orchestrator = import_from_string(orchestrator_str)
def run_handler(entry_point: str) -> None:
if str(Path.cwd()) not in sys.path:
sys.path.insert(0, str(Path.cwd()))
if not isinstance(orchestrator, Orchestrator):
raise TypeError(f"Not an Orchestrator: {type(orchestrator).__name__}")
runner = import_from_string(entry_point)
if app_str is not None:
app = import_from_string(app_str)
else:
module_str = orchestrator_str.partition(":")[0]
module = importlib.import_module(module_str)
if not callable(runner):
raise TypeError(f'"{entry_point}" is not callable')
app = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, App):
app = attr
break
if app is None:
raise ValueError(f'No App instance found in module "{module_str}"')
if not isinstance(app, App):
raise TypeError(f"Not an App: {type(app).__name__}")
return orchestrator.start_polling(app)
runner()
+4 -3
View File
@@ -13,7 +13,7 @@ DEFAULT_ARGPARSER: ArgParser = ArgParser(processed_args=[])
class Orchestrator:
def __init__(
self,
arg_parser: ArgParser = DEFAULT_ARGPARSER,
arg_parser: ArgParser | None = None,
custom_providers: list[Provider] | None = None,
auto_inject_handlers: bool = True,
):
@@ -22,11 +22,12 @@ class Orchestrator:
:param arg_parser: Cmd argument parser and configurator at startup
:return: None
"""
self._arg_parser: ArgParser = arg_parser
self._arg_parser: ArgParser | None = arg_parser
self._custom_providers: list[Provider] = custom_providers or []
self._auto_inject_handlers: bool = auto_inject_handlers
self._arg_parser._parse_args() # pyright: ignore[reportPrivateUsage]
if self._arg_parser is not None:
self._arg_parser._parse_args()
def start_polling(self, app: App) -> None:
"""