Files
Borealis-Github-Replica/Data/Engine/services/auth/context.py

200 lines
7.2 KiB
Python

# ======================================================
# Data\Engine\services\auth\context.py
# Description: Provides request-scoped authentication helpers with Dev Mode state integration.
#
# API Endpoints (if applicable): None
# ======================================================
"""Shared authentication helpers for Engine REST services."""
from __future__ import annotations
import logging
import os
import uuid
from typing import Any, Dict, Mapping, Optional, Tuple
from flask import Request, request, session
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from .dev_mode import DevModeManager
PermissionResult = Tuple[Dict[str, Any], int]
def _coerce_positive_int(value: Any, default: int) -> int:
try:
candidate = int(value)
if candidate > 0:
return candidate
except (TypeError, ValueError):
pass
return default
class RequestAuthContext:
"""Resolves the current operator and Dev Mode state for the active request."""
def __init__(
self,
*,
app,
dev_mode_manager: DevModeManager,
config: Optional[Mapping[str, Any]] = None,
logger: Optional[logging.Logger] = None,
) -> None:
self._app = app
self._dev_mode_manager = dev_mode_manager
self._config = config or {}
self._logger = logger or logging.getLogger(__name__)
secret = app.secret_key or "borealis-dev-secret"
self._serializer = URLSafeTimedSerializer(secret, salt="borealis-auth")
default_token_ttl = int(os.environ.get("BOREALIS_TOKEN_TTL_SECONDS", 60 * 60 * 24 * 30))
self._token_ttl = _coerce_positive_int(
self._config.get("auth_token_ttl_seconds"), default_token_ttl
)
default_dev_mode_ttl = int(os.environ.get("BOREALIS_DEV_MODE_TTL_SECONDS", 900))
self._dev_mode_ttl = _coerce_positive_int(
self._config.get("assemblies_dev_mode_ttl_seconds"), default_dev_mode_ttl
)
# ------------------------------------------------------------------
# Session helpers
# ------------------------------------------------------------------
def ensure_session_token(self) -> str:
"""Ensure an opaque session token exists for Dev Mode tracking."""
token = session.get("dev_mode_session")
if not token:
token = uuid.uuid4().hex
session["dev_mode_session"] = token
session.modified = True
return token
def current_user(self) -> Optional[Dict[str, Any]]:
"""Return the authenticated operator profile, if any."""
username = session.get("username")
role = session.get("role") or "User"
if username:
return {"username": username, "role": role}
token = self._bearer_token(request)
if not token:
return None
try:
data = self._serializer.loads(token, max_age=self._token_ttl)
except (BadSignature, SignatureExpired):
self._logger.debug("Bearer token invalid or expired.")
return None
except Exception: # pragma: no cover - defensive logging
self._logger.debug("Unexpected error validating bearer token.", exc_info=True)
return None
username = (data.get("u") or "").strip()
role = (data.get("r") or "User").strip() or "User"
if not username:
return None
return {"username": username, "role": role}
def dev_mode_enabled(self, *, user: Optional[Dict[str, Any]] = None) -> bool:
"""Return whether Dev Mode is currently active for the operator."""
profile = user or self.current_user()
if not profile:
return False
token = self.ensure_session_token()
return self._dev_mode_manager.is_enabled(username=profile["username"], session_token=token)
def set_dev_mode(self, enabled: bool, *, ttl_seconds: Optional[int] = None) -> bool:
"""Enable or disable Dev Mode for the active operator session."""
profile = self.current_user()
if not profile:
raise PermissionError("Authentication required to modify Dev Mode state.") # type: ignore[return-value]
token = self.ensure_session_token()
if enabled:
ttl = ttl_seconds if ttl_seconds and ttl_seconds > 0 else self._dev_mode_ttl
self._dev_mode_manager.enable(
username=profile["username"],
role=profile.get("role") or "User",
session_token=token,
ttl_seconds=ttl,
)
else:
self._dev_mode_manager.disable(username=profile["username"], session_token=token)
return self.dev_mode_enabled(user=profile)
def clear_dev_mode(self) -> None:
"""Explicitly disable Dev Mode for the active operator."""
profile = self.current_user()
if not profile:
return
token = self.ensure_session_token()
self._dev_mode_manager.disable(username=profile["username"], session_token=token)
# ------------------------------------------------------------------
# Permission helpers
# ------------------------------------------------------------------
def require_user(self) -> Tuple[Optional[Dict[str, Any]], Optional[PermissionResult]]:
user = self.current_user()
if not user:
return None, (
{
"error": "unauthorized",
"message": "Authentication required. Please sign in and retry.",
},
401,
)
return user, None
def require_admin(self, *, dev_mode_required: bool = False) -> Optional[PermissionResult]:
user = self.current_user()
if not user:
return (
{
"error": "unauthorized",
"message": "Authentication required. Please sign in and retry.",
},
401,
)
if not self.is_admin(user):
return (
{
"error": "forbidden",
"message": "Administrator permissions are required for this action.",
},
403,
)
if dev_mode_required and not self.dev_mode_enabled(user=user):
return (
{
"error": "dev_mode_required",
"message": "Enable Dev Mode from the Assemblies admin controls to continue.",
},
403,
)
return None
# ------------------------------------------------------------------
# Static helpers
# ------------------------------------------------------------------
@staticmethod
def is_admin(user: Dict[str, Any]) -> bool:
role = (user.get("role") or "").strip().lower()
return role == "admin"
@staticmethod
def _bearer_token(req: Request) -> Optional[str]:
header = (req.headers.get("Authorization") or "").strip()
if header.lower().startswith("bearer "):
return header.split(" ", 1)[1].strip()
cookie_token = req.cookies.get("borealis_auth")
if cookie_token:
return cookie_token
return None
__all__ = ["RequestAuthContext", "PermissionResult"]