# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""EnvPlugin - Environment capability-based access control plugin.
This plugin provides capability-based filtering for router entries. It evaluates
capability requirements defined on endpoints against system capabilities.
Capabilities can come from two sources:
1. **Request capabilities**: Passed explicitly via ``env_capabilities`` parameter
2. **Instance capabilities**: Declared on RoutingClass instances via a
``CapabilitiesSet`` subclass
When traversing a router hierarchy, capabilities are **accumulated** from all
RoutingClass instances along the path. This allows child services to inherit
capabilities from their parents while adding their own.
Usage::
from genro_routes import Router, RoutingClass, route
from genro_routes.plugins.env import CapabilitiesSet, capability
class MyCapabilities(CapabilitiesSet):
@capability
def redis(self) -> bool:
return True # Check if redis is available
@capability
def pyjwt(self) -> bool:
return "pyjwt" in sys.modules
class MyAPI(RoutingClass):
def __init__(self):
self.api = Router(self, name="api").plug("env")
self.capabilities = MyCapabilities()
@route("api", env_requires="pyjwt&redis")
def create_jwt(self):
return "jwt created"
@route("api", env_requires="paypal|stripe")
def process_payment(self):
return "payment processed"
# Query with additional request capabilities
obj = MyAPI()
obj.api.node("create_jwt", env_capabilities="pyjwt") # OK: pyjwt from request + redis from instance
obj.api.node("create_jwt") # not_available: only redis from instance, missing pyjwt
# Dynamic capabilities via CapabilitiesSet
class PaymentCapabilities(CapabilitiesSet):
def __init__(self, service):
self._service = service
@capability
def stripe(self) -> bool:
return self._service._stripe_configured
@capability
def paypal(self) -> bool:
return self._service._paypal_configured
class PaymentService(RoutingClass):
def __init__(self):
self.api = Router(self, name="api").plug("env")
self._stripe_configured = True
self._paypal_configured = False
self.capabilities = PaymentCapabilities(self)
Rule syntax (on entry env_requires):
- ``|`` : OR (system must have at least one)
- ``&`` : AND (system must have all)
- ``!`` : NOT (system must not have)
- ``()`` : grouping
NOTE: Comma is NOT allowed in env_requires. Use ``|`` for OR, ``&`` for AND.
Comma in env_capabilities means the system has multiple capabilities.
Example: env_requires="pyjwt&redis" means "system must have pyjwt AND redis"
Example: env_requires="paypal|stripe" means "system must have paypal OR stripe"
"""
from __future__ import annotations
from typing import Any
from genro_toolbox import tags_match
from genro_routes.core.router import Router
from genro_routes.plugins._base_plugin import BasePlugin, MethodEntry
__all__ = ["EnvPlugin", "CapabilitiesSet", "capability"]
[docs]
class EnvPlugin(BasePlugin):
"""Environment capability-based access control plugin.
Controls access to router entries based on system capabilities. Capabilities
represent runtime features (installed modules, configured services, etc.)
that may or may not be available.
Capability sources (combined with OR):
1. **Instance capabilities**: Declared on RoutingClass via ``CapabilitiesSet``
2. **Request capabilities**: Passed via ``env_capabilities`` parameter
3. **Accumulated capabilities**: Inherited from parent RoutingClass instances
Rule syntax (on entry via ``env_requires``):
- ``|`` : OR (system must have at least one capability)
- ``&`` : AND (system must have all capabilities)
- ``!`` : NOT (system must not have capability)
- ``()`` : grouping for complex expressions
Attributes:
plugin_code: "env" - used for registration and config prefix.
plugin_description: Human-readable description.
Example:
Entry definition::
@route("api", env_requires="redis&pyjwt") # requires both
def create_session(self): ...
@route("api", env_requires="stripe|paypal") # requires one
def process_payment(self): ...
Dynamic capabilities via CapabilitiesSet::
class ServerCaps(CapabilitiesSet):
@capability
def redis(self) -> bool:
return self._redis_client is not None
class MyService(RoutingClass):
def __init__(self):
self.api = Router(self, name="api").plug("env")
self.capabilities = ServerCaps()
Query with additional request capabilities::
# Adds pyjwt to instance capabilities for this request
router.node("create_session", env_capabilities="pyjwt")
"""
plugin_code = "env"
plugin_description = "Environment capability-based access control plugin"
plugin_default_param = "requires"
[docs]
def deny_reason(self, entry: MethodEntry, **filters: Any) -> str:
"""Filter entries based on capability requirements.
Capabilities are accumulated from:
1. Router capabilities (``router_capabilities`` if pre-computed, else from router)
2. Request capabilities (``capabilities`` parameter)
The combined set is checked against the entry's ``env_requires``.
Args:
entry: MethodEntry being checked.
**filters: May contain ``router_capabilities`` (pre-computed) and/or
``capabilities`` (from request).
Returns:
"": Access allowed (no reason to deny).
"not_available": Entry requires capabilities but none available,
or capabilities don't match rule.
"""
config = self.configuration(entry.name)
entry_rule = config.get("requires", "")
if not entry_rule:
return ""
# Use pre-computed router capabilities or compute them
router_caps = filters.get("router_capabilities")
if router_caps is None:
router_caps = self._router.current_capabilities
# Parse request capabilities
request_caps_str = filters.get("capabilities")
request_caps: set[str] = set()
if request_caps_str:
request_caps = {v.strip() for v in request_caps_str.split(",") if v.strip()}
# Combine all capabilities
all_caps = router_caps | request_caps
if not all_caps:
return "not_available"
if tags_match(entry_rule, all_caps):
return ""
return "not_available"
# ---------------------------------------------------------------------------
# CapabilitiesSet - Dynamic capability container
# ---------------------------------------------------------------------------
[docs]
def capability(func):
"""Mark a method as a capability checker.
The decorated method should return a bool indicating whether the capability
is currently active.
Usage::
class ServerCapabilities(CapabilitiesSet):
@capability
def jwt(self) -> bool:
return "jwt" in sys.modules
@capability
def redis(self) -> bool:
return self._redis_client is not None
"""
func._is_capability = True
return func
[docs]
class CapabilitiesSet:
"""Base class for dynamic capability sets.
Subclasses define capabilities as methods decorated with ``@capability``.
The class behaves like a set: supports ``in``, ``len``, and iteration.
Capabilities are evaluated dynamically on each access, allowing for
runtime conditions (e.g., time of day, module availability, configuration).
Usage::
from genro_routes.plugins.env import CapabilitiesSet, capability
class ServerCapabilities(CapabilitiesSet):
@capability
def jwt(self) -> bool:
return "pyjwt" in sys.modules
@capability
def send_mail(self) -> bool:
hour = datetime.now().hour
return 8 <= hour <= 20
caps = ServerCapabilities()
"jwt" in caps # True if pyjwt is installed
list(caps) # ["jwt", "send_mail"] (only active ones)
len(caps) # number of active capabilities
Integration with RoutingClass::
class MyService(RoutingClass):
def __init__(self):
self.api = Router(self, name="api").plug("env")
self.capabilities = ServerCapabilities()
"""
[docs]
def __iter__(self):
"""Yield names of currently active capabilities."""
for name in dir(self):
if name.startswith("_"):
continue
method = getattr(self, name)
if callable(method) and getattr(method, "_is_capability", False) and method():
yield name
[docs]
def __contains__(self, item: str) -> bool:
"""Check if a capability is currently active."""
method = getattr(self, item, None)
if method and getattr(method, "_is_capability", False):
return bool(method())
return False
[docs]
def __len__(self) -> int:
"""Return the number of currently active capabilities."""
return sum(1 for _ in self)
Router.register_plugin(EnvPlugin)