Source code for genro_routes.core.base_router

"""Plugin-free router runtime for Genro Routes.

This module exposes :class:`BaseRouter`, which binds methods on an object
instance, resolves path selectors (using '/' separator), and exposes rich
introspection without any plugin logic. Subclasses add middleware but must
preserve these semantics.

Constructor and slots
---------------------
Constructor signature::

    BaseRouter(owner, name=None, prefix=None, *, description=None,
               default_entry="index", branch=False, parent_router=None)

- ``owner`` is required; ``None`` raises ``ValueError``. Routers are bound to
  this instance and never re-bound.
- ``description``: optional human-readable description of this router's purpose.
  Included in ``nodes()`` output for documentation/introspection.
- ``default_entry``: the fallback entry name (default: "index") used when a path
  cannot be fully resolved. The router returns this entry with any unconsumed
  path segments passed as positional arguments when invoked.
- ``parent_router``: optional parent router. When provided, this router is
  automatically attached as a child using ``name`` as the alias. Requires
  ``name`` to be set; raises ``ValueError`` on name collision.
- Slots: ``instance``, ``name``, ``prefix``, ``description`` (optional router description),
  ``_entries`` (logical name → MethodEntry with handler), ``_children`` (name → child router).

Lazy binding
------------
Routers use lazy binding: methods decorated with ``@route`` are discovered and
registered automatically on first use (node/nodes). No explicit bind() call
is needed.

Marker discovery
----------------
``_iter_marked_methods`` walks the reversed MRO of ``type(owner)`` (child first
wins), scans ``__dict__`` for plain functions carrying ``_route_decorator_kw``
markers. Only markers whose ``name`` matches this router's ``name`` are used.

Handler table and wrapping
--------------------------
- ``_register_callable`` creates a ``MethodEntry`` and stores it in ``_entries``.
- ``_rebuild_handlers`` updates each entry's ``handler`` attribute by passing through
  ``_wrap_handler`` (default: passthrough). Subclasses may inject middleware.

Lookup and execution
--------------------
- ``node(path, **kwargs)`` resolves ``path`` using best-match resolution.
  Returns a ``RouterNode`` wrapper that is callable. The RouterNode contains
  metadata about the resolved entry. Unconsumed path segments are passed as
  positional arguments when the node is invoked.

Children (instance hierarchies)
-------------------------------
``include(source, name=...)`` links a Router or RouterNode into this router.
Accepts a Router (child hierarchy) or RouterNode (entry alias).

``detach_instance(child)`` removes all routers belonging to a child instance.

Instance attachment is handled by ``RoutingClass.attach_instance()``, which
sets ``_routing_parent`` and links child routers into parent routers.
Attached child routers inherit plugins via ``_on_attached_to_parent``.

Introspection
-------------
- ``nodes(**kwargs)`` builds a nested dict of routers and entries respecting
  filters. Returns dict with ``entries`` and ``routers`` keys only if non-empty.
  Output includes ``description`` (router's description) and ``owner_doc``
  (owner class docstring) for documentation purposes.

Output modes
------------
- ``nodes(mode="openapi")`` returns flat OpenAPI format with all paths merged.
- ``nodes(mode="h_openapi")`` returns hierarchical OpenAPI format preserving
  the router tree structure with ``description`` and ``owner_doc`` at each level.

Hooks for subclasses
--------------------
- ``_wrap_handler``: override to wrap callables (middleware stack).
- ``_after_entry_registered``: invoked after registering a handler.
- ``_on_attached_to_parent``: invoked when a child router is linked into this router.
- ``_describe_entry_extra``: allow subclasses to extend per-entry description.
"""

from __future__ import annotations

import inspect
import re
from collections.abc import Callable, Iterator
from typing import Any

from genro_toolbox.typeutils import safe_is_instance

from genro_routes.plugins._base_plugin import MethodEntry

from .router_interface import RouterInterface
from .router_node import RouterNode

__all__ = ["BaseRouter"]


[docs] class BaseRouter(RouterInterface): """Plugin-free router bound to an object instance. Responsibilities: - Register bound methods/functions with logical names (optionally via markers) - Resolve path selectors (using '/' separator) across child routers - Expose handler tables and introspection data - Provide hooks for subclasses to wrap handlers or filter introspection """ __slots__ = ( "instance", "name", "prefix", "description", "default_entry", "__entries_raw", "_children", "_get_defaults", "_is_branch", "_bound", )
[docs] def __init__( self, owner: Any, name: str | None = None, prefix: str | None = None, *, description: str | None = None, default_entry: str = "index", get_default_handler: Callable | None = None, get_kwargs: dict[str, Any] | None = None, branch: bool = False, parent_router: BaseRouter | None = None, ) -> None: if owner is None: raise ValueError("Router requires a parent instance") if not safe_is_instance(owner, "genro_routes.core.routing.RoutingClass"): raise TypeError( f"Router owner must be a RoutingClass instance, got {type(owner).__name__}. " "Inherit from RoutingClass to use Router." ) self.instance = owner self.name = name self.prefix = prefix or "" self.description = description self.default_entry = default_entry self._is_branch = bool(branch) self._bound = False self.__entries_raw: dict[str, MethodEntry] = {} self._children: dict[str, BaseRouter] = {} defaults: dict[str, Any] = dict(get_kwargs or {}) if get_default_handler is not None: defaults.setdefault("default_handler", get_default_handler) self._get_defaults: dict[str, Any] = defaults # Register with owner only if this is a root router if parent_router is None: hook = getattr(self.instance, "_register_router", None) if callable(hook): hook(self) # Attach to parent router if specified if parent_router is not None: alias = name if not alias: raise ValueError("Child router must have a name when using parent_router") if alias in parent_router._children and parent_router._children[alias] is not self: raise ValueError(f"Child name collision: {alias!r}") parent_router._children[alias] = self self._on_attached_to_parent(parent_router)
# ------------------------------------------------------------------ # Lazy binding property # ------------------------------------------------------------------ @property def _entries(self) -> dict[str, MethodEntry]: """Access entries dict, triggering lazy binding if needed.""" if not self._bound: self._bind() return self.__entries_raw @property def current_capabilities(self) -> set[str]: """Collect capabilities from instance and parent chain. Walks up the _routing_parent chain accumulating capabilities from each RoutingClass instance. Returns: Combined set of capabilities from all instances in the hierarchy. """ accumulated: set[str] = set() instance = self.instance while instance is not None: instance_caps = getattr(instance, "capabilities", None) if instance_caps: accumulated.update(instance_caps) instance = getattr(instance, "_routing_parent", None) return accumulated def _is_known_plugin(self, prefix: str) -> bool: """Check if prefix corresponds to a registered plugin name.""" try: from genro_routes.core.router import Router # type: ignore except Exception: # pragma: no cover - import safety return False return prefix in Router.available_plugins() def _get_plugin_default_param(self, plugin_name: str) -> str | None: """Return the default parameter name for a plugin, or None.""" try: from genro_routes.core.router import Router # type: ignore except Exception: # pragma: no cover - import safety return None registry = Router.available_plugins() plugin_class = registry.get(plugin_name) if plugin_class is None: return None return getattr(plugin_class, "plugin_default_param", None) # ------------------------------------------------------------------ # Registration helpers # ------------------------------------------------------------------
[docs] def add_entry( self, target: Any, *, name: str | None = None, metadata: dict[str, Any] | None = None, replace: bool = False, **options: Any, ) -> BaseRouter: """Register handler(s) on this router. Note: For most use cases, prefer the ``@route`` decorator. Use ``add_entry`` directly only for dynamic registration (e.g., introspection-based mapping of external libraries). Args: target: Callable, attribute name(s), comma-separated string, or wildcard marker. name: Logical name override for this entry. metadata: Extra metadata stored on the MethodEntry. replace: Allow overwriting an existing logical name. options: Extra metadata merged into entry metadata. Returns: self (to allow chaining). Raises: ValueError: on handler name collision when replace is False. AttributeError: when resolving missing attributes on owner. TypeError: on unsupported target type. """ if self._is_branch: raise ValueError("Branch routers cannot register handlers") entry_name = name # Split plugin-scoped options (<plugin>_<key>) and meta_* from core options plugin_options: dict[str, dict[str, Any]] = {} core_options: dict[str, Any] = {} for key, value in options.items(): # Handle meta_* kwargs - group under "meta" key if key.startswith("meta_"): meta_key = key[5:] # strip "meta_" core_options.setdefault("meta", {})[meta_key] = value continue if "_" in key: plugin_name, plug_key = key.split("_", 1) if plugin_name and plug_key and self._is_known_plugin(plugin_name): plugin_options.setdefault(plugin_name, {})[plug_key] = value continue elif self._is_known_plugin(key): default_param = self._get_plugin_default_param(key) if default_param: plugin_options.setdefault(key, {})[default_param] = value continue core_options[key] = value if isinstance(target, (list, tuple, set)): for entry in target: self.add_entry( entry, name=entry_name, metadata=dict(metadata or {}), replace=replace, **core_options, ) return self if isinstance(target, str): target = target.strip() if not target: return self if target in {"*", "_all_", "__all__"}: self._register_marked( name=entry_name, metadata=metadata, replace=replace, extra=core_options, plugin_options=plugin_options, ) self._bound = True # Mark as bound after marker discovery return self if "," in target: for chunk in target.split(","): chunk = chunk.strip() if chunk: self.add_entry( chunk, name=entry_name, metadata=dict(metadata or {}), replace=replace, **core_options, ) return self bound = getattr(self.instance, target) elif callable(target): bound = ( target if inspect.ismethod(target) else target.__get__(self.instance, type(self.instance)) ) else: raise TypeError(f"Unsupported entry target: {target!r}") entry_meta = dict(metadata or {}) entry_meta.update(core_options) self._register_callable( bound, name=entry_name, metadata=entry_meta, replace=replace, plugin_options=plugin_options, ) return self
def _register_callable( self, bound: Callable, *, name: str | None = None, metadata: dict[str, Any] | None = None, replace: bool = False, plugin_options: dict[str, dict[str, Any]] | None = None, endpoint_id: str | None = None, ) -> None: """Create a MethodEntry and store it in the entries table. Args: bound: The bound method to register. name: Optional name override (otherwise uses func name with prefix stripped). metadata: Extra metadata to attach to the entry. replace: If True, allow overwriting existing entry. plugin_options: Per-plugin configuration from decorator kwargs. endpoint_id: Optional globally unique identifier for reverse lookup. """ logical_name = self._resolve_name(bound.__name__, name_override=name) if logical_name in self._entries and not replace: raise ValueError(f"Handler name collision: {logical_name}") entry = MethodEntry( name=logical_name, func=bound, router=self, plugins=[], metadata=dict(metadata or {}), endpoint_id=endpoint_id, ) # Attach plugin-scoped config to metadata for later consumption by plugin-enabled routers. if plugin_options: entry.metadata["plugin_config"] = plugin_options self._entries[logical_name] = entry self._after_entry_registered(entry) self._rebuild_handlers() def _register_marked( self, *, name: str | None, metadata: dict[str, Any] | None, replace: bool, extra: dict[str, Any], plugin_options: dict[str, dict[str, Any]] | None = None, ) -> None: """Discover and register all @route-decorated methods for this router. Iterates through methods with _route_decorator_kw markers matching this router's name and registers each as an entry. """ for func, marker in self._iter_marked_methods(): entry_override = marker.pop("entry_name", None) marker_endpoint_id = marker.pop("endpoint_id", None) entry_name = name if name is not None else entry_override entry_meta = dict(metadata or {}) entry_meta.update(marker) entry_meta.update(extra) # Split plugin-scoped options and meta_* from marker payload marker_plugin_opts: dict[str, dict[str, Any]] = {} core_marker: dict[str, Any] = {} for key, value in entry_meta.items(): # Handle meta_* kwargs - group under "meta" key if key.startswith("meta_"): meta_key = key[5:] # strip "meta_" core_marker.setdefault("meta", {})[meta_key] = value continue if "_" in key: plugin_name, plug_key = key.split("_", 1) if plugin_name and plug_key and self._is_known_plugin(plugin_name): marker_plugin_opts.setdefault(plugin_name, {})[plug_key] = value continue elif self._is_known_plugin(key): default_param = self._get_plugin_default_param(key) if default_param: marker_plugin_opts.setdefault(key, {})[default_param] = value continue core_marker[key] = value entry_meta = core_marker merged_plugin_opts: dict[str, dict[str, Any]] = {} if plugin_options: merged_plugin_opts.update(plugin_options) for pname, pdata in marker_plugin_opts.items(): merged_plugin_opts.setdefault(pname, {}).update(pdata) bound = func.__get__(self.instance, type(self.instance)) self._register_callable( bound, name=entry_name, metadata=entry_meta, replace=replace, plugin_options=merged_plugin_opts or None, endpoint_id=marker_endpoint_id, ) def _iter_marked_methods(self) -> Iterator[tuple[Callable, dict[str, Any]]]: """Yield (func, marker_dict) for methods decorated with @route for this router. Walks the MRO (child classes first) and scans __dict__ for functions carrying _route_decorator_kw markers. Only yields markers where the router name matches this router's name. """ cls = type(self.instance) # Check if instance has exactly one router (default_router) default_router_name: str | None = None if hasattr(self.instance, "default_router"): default = self.instance.default_router if default is not None: default_router_name = default.name # Track seen method names to respect MRO (derived class wins) # Track seen function ids to avoid duplicate registration of aliases (alias = original) seen_names: set[str] = set() seen_funcs: set[int] = set() for base in cls.__mro__: base_dict = vars(base) for attr_name, value in base_dict.items(): if not inspect.isfunction(value): continue # Skip if method name already seen (MRO: derived wins) if attr_name in seen_names: continue seen_names.add(attr_name) # Skip if same function already yielded (alias deduplication) func_id = id(value) if func_id in seen_funcs: continue seen_funcs.add(func_id) markers = getattr(value, "_route_decorator_kw", None) if not markers: continue for marker in markers: marker_name = marker.get("name") # If marker_name is None, use default_router (only if single router) if marker_name is None: marker_name = default_router_name if marker_name != self.name: continue payload = dict(marker) payload.pop("name", None) yield value, payload def _resolve_name(self, func_name: str, *, name_override: str | None) -> str: """Compute the logical entry name from the function name. Args: func_name: The __name__ of the function being registered. name_override: Explicit name to use (bypasses prefix stripping). Returns: The entry name: either name_override, or func_name with prefix stripped. """ if name_override: return name_override if self.prefix and func_name.startswith(self.prefix): return func_name[len(self.prefix) :] return func_name def _wrap_handler( self, entry: MethodEntry, call_next: Callable ) -> Callable: # pragma: no cover - overridden by plugin routers """Wrap a handler callable (hook for subclasses to add middleware). Default implementation returns call_next unchanged. Subclasses (Router) override this to build a middleware pipeline from attached plugins. Args: entry: The MethodEntry being wrapped. call_next: The next callable in the chain (initially entry.func). Returns: The wrapped callable to use as entry.handler. """ return call_next # ------------------------------------------------------------------ # Binding (finalization) # ------------------------------------------------------------------ def _bind(self) -> None: """Finalize router configuration and trigger route discovery. Internal method called automatically on first use (lazy binding). Discovers @route decorated methods and registers them. """ if self._bound: return # Already bound, no-op self._bound = True # Set BEFORE work to avoid recursion via properties if not self._is_branch: self.add_entry("*") def _require_bound(self, operation: str) -> None: """Ensure the router is bound, auto-binding if needed. Args: operation: Name of the operation being attempted (unused, kept for API). This implements lazy binding: the router auto-binds on first use. """ if not self._bound: self._bind() # ------------------------------------------------------------------ # Handler rebuilding # ------------------------------------------------------------------ def _rebuild_handlers(self) -> None: """Rebuild wrapped handlers for all entries owned by this router.""" for entry in self.__entries_raw.values(): if entry.router is not self: continue # alias — handler belongs to the source router entry.handler = self._wrap_handler(entry, entry.func) # ------------------------------------------------------------------ # Children management # ------------------------------------------------------------------
[docs] def include(self, source: Any, *, name: str | None = None) -> None: """Include a Router or RouterNode (entry alias) into this router. Accepts two source types: - **Router**: links the source router as a child of this router. Plugin inheritance is triggered via ``_on_attached_to_parent``. If the source router belongs to a RoutingClass, ``_routing_parent`` is set on the owner. - **RouterNode**: creates an alias for a single entry in this router. The original handler is shared — no copy is made. Args: source: A Router instance or a RouterNode from ``node()``. name: Alias in this router. For Router sources, defaults to ``source.name``. For RouterNode sources, required. Raises: TypeError: If source is not a Router or RouterNode. ValueError: If name is required but not provided. ValueError: If alias collision in _children or _entries. Examples:: # Include a router self._sys.include(swagger.api, name="swagger") # Include an entry as alias fatture.api.include( pagamenti.api.node("collega_a_fattura"), name="collega_pagamento", ) """ from .router_node import RouterNode if isinstance(source, BaseRouter): self._include_router(source, name) elif isinstance(source, RouterNode): self._include_node(source, name) else: raise TypeError( f"include() accepts Router or RouterNode, got {type(source).__name__}" )
def _include_router(self, source: BaseRouter, name: str | None) -> None: """Link a Router as a child of this router. If the source router has no parent yet (first include), this is a primary attachment: plugin inheritance is triggered and _routing_parent is set on the owner. If the source already has a parent (subsequent include), this creates a secondary link — a navigational shortcut only. No plugin inheritance, no _routing_parent change. """ alias = name or source.name if not alias: raise ValueError("include() requires a name (source router has no name)") if alias in self._children and self._children[alias] is not source: raise ValueError(f"Child name collision: {alias}") self._children[alias] = source owner = source.instance is_primary = owner is not None and getattr(owner, "_routing_parent", None) is None if is_primary: source._on_attached_to_parent(self) object.__setattr__(owner, "_routing_parent", self.instance) def _include_node(self, source: Any, name: str | None) -> None: """Create an entry alias from a RouterNode.""" if name is None: raise ValueError("include() requires name when including a RouterNode") entry = source._entry if entry is None: raise ValueError( f"Cannot include RouterNode: no entry resolved (error={source.error})" ) # Use _entries (triggers lazy binding) so collision check works # even before the router has been accessed if name in self._entries and self._entries[name] is not entry: raise ValueError(f"Entry name collision: {name}") self._BaseRouter__entries_raw[name] = entry # type: ignore[attr-defined]
[docs] def detach_instance(self, routing_child: Any) -> BaseRouter: """Detach all routers belonging to a RoutingClass instance.""" if not safe_is_instance(routing_child, "genro_routes.core.routing.RoutingClass"): raise TypeError("detach_instance() requires a RoutingClass instance") removed: list[str] = [] for alias, router in list(self._children.items()): if router.instance is routing_child: removed.append(alias) self._children.pop(alias, None) if getattr(routing_child, "_routing_parent", None) is self.instance: object.__setattr__(routing_child, "_routing_parent", None) # Clean up plugin children references to avoid memory leaks plugin_children = getattr(self, "_plugin_children", None) if plugin_children is not None: for plugin_name, children_list in list(plugin_children.items()): plugin_children[plugin_name] = [ r for r in children_list if r.instance is not routing_child ] # No hard error if nothing was removed; detach is best-effort. return routing_child # type: ignore[no-any-return]
# ------------------------------------------------------------------ # URL building # ------------------------------------------------------------------
[docs] def get_url(self, path: str, **kwargs: Any) -> str: """Build a URL path for a handler, appending positional parameters. Accepts a path (e.g., ``"users/detail"``) or an endpoint_id with ``@`` prefix (e.g., ``"@invoice.detail"``). Keyword arguments that match positional parameters in the handler's signature are appended as path segments in declaration order. Args: path: Handler path or ``"@endpoint_id"``. **kwargs: Parameter values to append to the path. Only positional-or-keyword parameters are appended (in declaration order). Keyword-only parameters are ignored. Returns: The full URL path string. Raises: ValueError: If the path does not resolve to a valid handler. Example:: # Given: @route("api", endpoint_id="invoice.detail") # def detail(self, invoice_id): ... # Mounted at: billing/detail router.get_url("@invoice.detail", invoice_id=123) # → "billing/detail/123" router.get_url("billing/detail", invoice_id=123) # → "billing/detail/123" """ node = self._find_candidate_node(path) if node._entry is None: raise ValueError(f"get_url: path '{path}' does not resolve to a handler") base: str = node.path or "" if not kwargs: return base # Get positional parameter names from the handler signature sig = inspect.signature(node._entry.func) positional_names = [ p.name for p in sig.parameters.values() if p.name != "self" and p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] segments = [str(kwargs[name]) for name in positional_names if name in kwargs] if segments: return f"{base}/{'/'.join(segments)}" return base
# ------------------------------------------------------------------ # Node resolution # ------------------------------------------------------------------ def _find_candidate_node(self, path: str) -> RouterNode: """Resolve path to a candidate RouterNode without permission checks. Pure path resolution: walks through children, finds entry or falls back to default_entry. No auth checks applied. Supports endpoint_id lookup with ``@`` prefix: ``"@my-endpoint-id"`` resolves via recursive search instead of path traversal. Args: path: Path to resolve (e.g., "entry", "child/entry/arg1/arg2", or "@endpoint_id" for reverse lookup). Returns: RouterNode with entry reference. If path not found, node.error will be set to "not_found" when permission checks are applied. """ stripped = path.strip("/") if not stripped: return RouterNode(self, path="") if stripped.startswith("@"): return self._find_by_endpoint_id(stripped[1:]) parts = stripped.split("/") router: BaseRouter | None = self pathlist: list[str] = [] while parts and router: last_router = router head = parts.pop(0) pathlist.append(head) if head in router._entries: return RouterNode(router, entry_name=head, partial=parts, path="/".join(pathlist)) router = router._children.get(head) if router: return RouterNode(router, path="/".join(pathlist)) return RouterNode(last_router, partial=[head] + parts, path="/".join(pathlist[:-1])) def _find_by_endpoint_id(self, endpoint_id: str) -> RouterNode: """Resolve an endpoint_id to a RouterNode by recursive search. Walks the entire router tree looking for a MethodEntry whose endpoint_id matches the given value. Args: endpoint_id: The endpoint identifier to search for. Returns: RouterNode pointing to the matched entry, or a not-found node. """ result = self._search_endpoint_id(endpoint_id, []) if result is not None: router, entry_name, path_parts = result return RouterNode(router, entry_name=entry_name, path="/".join(path_parts)) return RouterNode(self, path=f"@{endpoint_id}") def _search_endpoint_id( self, endpoint_id: str, path_parts: list[str] ) -> tuple[BaseRouter, str, list[str]] | None: """Recursively search for endpoint_id in entries and children. Args: endpoint_id: The endpoint identifier to find. path_parts: Accumulated path segments for building the full path. Returns: Tuple of (router, entry_name, full_path_parts) or None. """ for entry_name, entry in self._entries.items(): if entry.endpoint_id == endpoint_id: return self, entry_name, [*path_parts, entry_name] for child_alias, child_router in self._children.items(): result = child_router._search_endpoint_id( endpoint_id, [*path_parts, child_alias] ) if result is not None: return result return None # ------------------------------------------------------------------ # Introspection helpers # ------------------------------------------------------------------
[docs] def router_at_path(self, path: str) -> BaseRouter | None: """Find the router at the given path. Args: path: Path to navigate (e.g., "child/grandchild"). Returns: The router at the path, or None if not found. """ parts = [p for p in path.strip("/").split("/") if p] router: BaseRouter | None = self while parts and router: router = router._children.get(parts.pop(0)) return router
[docs] def nodes( self, basepath: str | None = None, lazy: bool = False, mode: str | None = None, pattern: str | None = None, forbidden: bool = False, **kwargs: Any, ) -> dict[str, Any]: """Return a tree of routers/entries/metadata respecting filters. Args: basepath: Optional path to start from (e.g., "child/grandchild"). If provided, returns nodes starting from that point in the hierarchy instead of from this router. lazy: If True, child routers are returned as router references instead of recursively expanded. Use basepath to navigate and expand specific children on demand. mode: Output format mode. Supported modes: - None: Standard introspection format with full metadata. - "openapi": Flat OpenAPI format with all paths merged. - "h_openapi": Hierarchical OpenAPI format preserving the router tree structure. pattern: Optional regex pattern to filter entry names. Only entries whose name matches the pattern are included. Applied before plugin deny_reason() checks. forbidden: If True, include entries that are not allowed (e.g., due to authorization or capability requirements). These entries will have a ``forbidden`` field with the reason (e.g., "not_authorized", "not_available"). Default False. **kwargs: Filter arguments passed to plugins via deny_reason(). Returns: A dict containing: - ``name``: Router name - ``description``: Router description (if set) - ``owner_doc``: Owner class docstring (for documentation) - ``router``: Reference to this router - ``instance``: Owner instance - ``plugin_info``: Plugin configuration info - ``entries``: Dict of entry names to entry info (if any) - ``routers``: Dict of child names to child nodes (if any) When mode is specified, output is translated to that format. """ if basepath: router = self.router_at_path(basepath) if router: # Get nodes from child router, then apply basepath prefix for openapi modes child_nodes = router.nodes(lazy=lazy, mode=None, pattern=pattern, forbidden=forbidden, **kwargs) if mode and child_nodes: from genro_routes.plugins.openapi import OpenAPITranslator translator = getattr(OpenAPITranslator, f"translate_{mode}", None) if translator is None: raise ValueError(f"Unknown mode: {mode}") # Prepend basepath to paths so they are absolute from root path_prefix = "/" + basepath.strip("/") basepath_result: dict[str, Any] = translator(child_nodes, lazy=lazy, path_prefix=path_prefix) return basepath_result return child_nodes return {} # Compile pattern once if provided pattern_re = re.compile(pattern) if pattern else None router_caps = self.current_capabilities entries: dict[str, Any] = {} for entry in self._entries.values(): if pattern_re is not None and not pattern_re.search(entry.name): continue allow_result = self._entry_invalid_reason(entry, env_router_capabilities=router_caps, **kwargs) if allow_result == "": entries[entry.name] = self._entry_node_info(entry) elif forbidden: entry_info = self._entry_node_info(entry) entry_info["forbidden"] = allow_result entries[entry.name] = entry_info routers: dict[str, Any] if lazy: # In lazy mode, just return the router references - use basepath to expand routers = dict(self._children) else: routers = { child_name: child.nodes(pattern=pattern, forbidden=forbidden, **kwargs) for child_name, child in self._children.items() } # Remove empty routers only in non-lazy mode (unless forbidden=True) if not forbidden: routers = {k: v for k, v in routers.items() if v} # If nothing, return empty dict if not entries and not routers: return {} result: dict[str, Any] = { "name": self.name, "description": self.description, "owner_doc": self.instance.__class__.__doc__, "router": self, "instance": self.instance, "plugin_info": self._get_plugin_info(), } if entries: result["entries"] = entries if routers: result["routers"] = routers # Translate to requested format if mode specified if mode: from genro_routes.plugins.openapi import OpenAPITranslator translator = getattr(OpenAPITranslator, f"translate_{mode}", None) if translator is None: raise ValueError(f"Unknown mode: {mode}") translated: dict[str, Any] = translator(result, lazy=lazy) return translated return result
[docs] def node( self, path: str, errors: dict[str, type[Exception]] | None = None, openapi: bool = False, **kwargs: Any, ) -> RouterNode: """Return info about a single node (router or entry) at the given path. Unlike nodes() which returns the full subtree, this method returns information about just one specific node without recursion. This method always performs best-match resolution: it walks the path as far as possible, tracking the last valid callable node (entry or router with default_entry). If the exact path is not found, it falls back to that last valid node and passes unconsumed path segments as positional arguments when the node is invoked. The returned RouterNode is callable - invoking it executes the handler. Args: path: Path to the node (e.g., "entry_name" or "child/grandchild/entry"). errors: Optional dict mapping error codes to custom exception classes. Available codes (see ``RouterNode.ERROR_CODES``): - ``not_found``: Path not found or varargs_required - ``not_authorized``: Auth tags don't match (403) - ``not_authenticated``: Auth required but not provided (401) - ``validation_error``: Pydantic validation failed Example:: node = router.node("handler", errors={ 'not_found': HTTPNotFound, 'not_authorized': HTTPForbidden, }) openapi: If True, populate the ``openapi`` attribute with OpenAPI info. **kwargs: Plugin-prefixed filter kwargs (e.g., auth_tags="x"). Returns: A RouterNode with these public properties: - ``path``: Full path to this node - ``error``: Error code (None if ok, else "not_found", "not_authorized", etc.) - ``doc``: Entry docstring - ``metadata``: Entry metadata dict - ``openapi``: OpenAPI info dict (if ``openapi=True`` was passed) The RouterNode is callable:: node = router.node("my_handler") result = node() # Invoke the handler If error is set, calling the node raises the mapped exception. """ # Find candidate node (pure path resolution) candidate = self._find_candidate_node(path) candidate.set_custom_exceptions(errors) # Set error via _entry_invalid_reason (handles both missing entry and plugin checks) candidate.error = candidate._router._entry_invalid_reason(candidate._entry, **kwargs) or None # Populate openapi if requested if openapi and candidate._entry is not None: from genro_routes.plugins.openapi import OpenAPITranslator candidate.openapi = OpenAPITranslator.entry_to_openapi(candidate._entry) return candidate
def _entry_node_info(self, entry: MethodEntry) -> dict[str, Any]: """Build info dict for a single entry.""" info: dict[str, Any] = { "name": entry.name, "callable": entry.func, "metadata": entry.metadata, "doc": inspect.getdoc(entry.func) or entry.func.__doc__ or "", } extra = self._describe_entry_extra(entry, info) if extra: info.update(extra) return info def _get_plugin_info(self) -> dict[str, Any]: """Build plugin_info dict from _plugin_info store.""" info_source = getattr(self, "_plugin_info", {}) or {} return { pname: { key: { "config": dict(slot.get("config", {})), "locals": dict(slot.get("locals", {})), } for key, slot in pdata.items() } for pname, pdata in info_source.items() } # ------------------------------------------------------------------ # Plugin hooks (no-op for BaseRouter) # ------------------------------------------------------------------
[docs] def iter_plugins(self) -> list[Any]: # pragma: no cover - base router has no plugins return []
def _on_attached_to_parent( self, parent: BaseRouter ) -> None: # pragma: no cover - hook for subclasses """Hook for plugin-enabled routers to override when attached.""" return None def _after_entry_registered( self, entry: MethodEntry ) -> None: # pragma: no cover - hook for subclasses """Hook invoked after a handler is registered (subclasses may override).""" return None def _describe_entry_extra( self, entry: MethodEntry, base_description: dict[str, Any] ) -> dict[str, Any]: # pragma: no cover - overridden when plugins present """Hook used by subclasses to inject extra description data.""" return {} def _entry_invalid_reason(self, entry: MethodEntry | None, **filters: Any) -> str: """Hook used by subclasses to decide if an entry is exposed. Args: entry: The entry to check (None if not found). **filters: Filter kwargs. Returns: "": Entry is allowed. "not_found": Entry is None (path not resolved). "not_authenticated": Entry requires auth but no credentials provided (401). "not_authorized": Credentials provided but insufficient (403). """ if entry is None: return "not_found" return ""