mirror of
https://github.com/koloideal/Argenta.git
synced 2026-06-10 10:05:28 +03:00
benchs
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
__all__ = ['EntrypointResolver', 'EntryPointAsApp', 'CallableEntryPoint']
|
||||
__all__ = ["EntrypointResolver", "EntryPointAsApp", "CallableEntryPoint"]
|
||||
|
||||
import importlib
|
||||
import inspect
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Callable, Protocol, cast, get_args
|
||||
|
||||
from argenta.app.models import App
|
||||
@@ -35,12 +37,19 @@ class EntryPointAsApp:
|
||||
instance_object: App
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class ResolvedEntrypoint:
|
||||
resolved_source_path: str
|
||||
instance: Callable[[], None] | App
|
||||
|
||||
|
||||
class EntrypointResolver[T: (CallableEntryPoint, EntryPointAsApp)]:
|
||||
def __init__(self, path_to_entrypoint: str):
|
||||
self._path_to_entrypoint = path_to_entrypoint
|
||||
|
||||
def parse_entrypoint_with_type(
|
||||
self, entrypoint_object_name: str,
|
||||
self,
|
||||
entrypoint_object_name: str,
|
||||
) -> T:
|
||||
entrypoint_type: type[T] = get_args(self.__orig_class__)[0] # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
|
||||
if entrypoint_type is CallableEntryPoint:
|
||||
@@ -71,32 +80,46 @@ class EntrypointResolver[T: (CallableEntryPoint, EntryPointAsApp)]:
|
||||
|
||||
return EntryPointAsApp(raw_path=resolved_entrypoint[0], instance_object=instance_object)
|
||||
|
||||
def _resolve_from_string(self, entrypoint_object_name: str) -> tuple[str, object]:
|
||||
abs_path = Path(self._path_to_entrypoint).resolve()
|
||||
if not abs_path.exists():
|
||||
raise ResolveFromStringError(f'File "{self._path_to_entrypoint}" not found')
|
||||
def _resolve_from_string(self, entrypoint_object_name: str) -> ResolvedEntrypoint:
|
||||
raw_path = self._path_to_entrypoint
|
||||
is_file_path = bool(re.search(r"[\/\\]|\.py$", raw_path))
|
||||
|
||||
package_root = abs_path.parent
|
||||
while (package_root / "__init__.py").exists():
|
||||
package_root = package_root.parent
|
||||
if is_file_path:
|
||||
abs_path = Path(raw_path).resolve()
|
||||
if not abs_path.exists():
|
||||
raise ResolveFromStringError(f'File "{raw_path}" not found')
|
||||
|
||||
pkg_root_str = str(package_root)
|
||||
if pkg_root_str not in sys.path:
|
||||
sys.path.insert(0, pkg_root_str)
|
||||
package_root = abs_path.parent
|
||||
while (package_root / "__init__.py").exists():
|
||||
package_root = package_root.parent
|
||||
|
||||
module_name = ".".join(abs_path.relative_to(package_root).with_suffix("").parts)
|
||||
pkg_root_str = str(package_root)
|
||||
if pkg_root_str not in sys.path:
|
||||
sys.path.insert(0, pkg_root_str)
|
||||
|
||||
module_name = ".".join(abs_path.relative_to(package_root).with_suffix("").parts)
|
||||
resolved_source_path = str(abs_path)
|
||||
|
||||
else:
|
||||
module_name = raw_path
|
||||
|
||||
cwd_str = str(Path.cwd())
|
||||
if cwd_str not in sys.path:
|
||||
sys.path.insert(0, cwd_str)
|
||||
|
||||
resolved_source_path = module_name
|
||||
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
except ImportError as e:
|
||||
raise ResolveFromStringError(f'Cannot import module "{module_name}": {e}')
|
||||
|
||||
if not is_file_path:
|
||||
resolved_source_path = getattr(module, "__file__", resolved_source_path)
|
||||
|
||||
try:
|
||||
instance = getattr(module, entrypoint_object_name)
|
||||
except AttributeError:
|
||||
raise ResolveFromStringError(
|
||||
f'"{entrypoint_object_name}" not found in "{self._path_to_entrypoint}"'
|
||||
)
|
||||
|
||||
return str(abs_path), instance
|
||||
raise ResolveFromStringError(f'"{entrypoint_object_name}" not found in "{raw_path}"')
|
||||
|
||||
return ResolvedEntrypoint(resolved_source_path, instance)
|
||||
|
||||
Reference in New Issue
Block a user