From fdd95bad23680d57f4a9ee2a8f76a003857144c3 Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sun, 2 Nov 2025 21:15:56 -0700 Subject: [PATCH] Assembly Management Rework - Stage 3 Complete --- .../Engine/Assemblies/DB_MIGRATION_TRACKER.md | 12 +- Data/Engine/services/API/__init__.py | 28 +- .../services/API/assemblies/management.py | 315 +++++++++++++----- Data/Engine/services/auth/__init__.py | 14 + Data/Engine/services/auth/context.py | 199 +++++++++++ Data/Engine/services/auth/dev_mode.py | 187 +++++++++++ 6 files changed, 672 insertions(+), 83 deletions(-) create mode 100644 Data/Engine/services/auth/__init__.py create mode 100644 Data/Engine/services/auth/context.py create mode 100644 Data/Engine/services/auth/dev_mode.py diff --git a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md index 372a8cba..9754e963 100644 --- a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md +++ b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md @@ -77,9 +77,9 @@ - Added Dev Mode toggle/flush endpoints plus official-domain sync, all wired to the cache write queue and importer; verified via operator testing of the API list/update/clone paths. ## 3. Implement Dev Mode authorization and UX toggles -[ ] Gate privileged writes behind Admin role + Dev Mode toggle. -[ ] Store Dev Mode state server-side (per-user session or short-lived token) to prevent unauthorized access. -[ ] Ensure Dev Mode audit logging for all privileged operations. +[x] Gate privileged writes behind Admin role + Dev Mode toggle. +[x] Store Dev Mode state server-side (per-user session or short-lived token) to prevent unauthorized access. +[x] Ensure Dev Mode audit logging for all privileged operations. ### Details ``` 1. Extend authentication middleware in `Data/Engine/services/auth/` to include role checks and Dev Mode status. @@ -88,6 +88,12 @@ 4. Update error responses for insufficient privileges to guide admins to enable Dev Mode. ``` +**Stage Notes** +- Added `Data/Engine/services/auth/` with `DevModeManager` and `RequestAuthContext`, storing Dev Mode grants server-side with configurable TTL enforcement. +- `EngineServiceAdapters` now provisions a shared `DevModeManager`, exposing auth helpers to API groups. +- Updated `Data/Engine/services/API/assemblies/management.py` to enforce admin + Dev Mode checks for privileged mutations, centralise error messaging, and route Dev Mode toggles through the server-side manager. +- Privileged assembly endpoints now emit structured audit entries via the Engine service log, covering success and denial paths for create/update/delete/clone, manual flush, official sync, and Dev Mode state changes. + ## 4. Enhance Assembly Editor WebUI [ ] Add “Source” column to AG Grid with domain filter badges. [ ] Display yellow “Queued to Write to DB” pill for assemblies whose cache entry is dirty. diff --git a/Data/Engine/services/API/__init__.py b/Data/Engine/services/API/__init__.py index 44a43ffe..8f05b738 100644 --- a/Data/Engine/services/API/__init__.py +++ b/Data/Engine/services/API/__init__.py @@ -11,6 +11,7 @@ from __future__ import annotations import datetime as _dt import logging +import os import re import sqlite3 import time @@ -28,6 +29,7 @@ from ...database import initialise_engine_database from ...security import signing from ...enrollment import NonceCache from ...integrations import GitHubIntegration +from ..auth import DevModeManager from .enrollment import routes as enrollment_routes from .tokens import routes as token_routes @@ -142,6 +144,7 @@ def _make_db_conn_factory(database_path: str) -> Callable[[], sqlite3.Connection @dataclass class EngineServiceAdapters: context: EngineContext + config: Mapping[str, Any] = field(init=False) db_conn_factory: Callable[[], sqlite3.Connection] = field(init=False) jwt_service: Any = field(init=False) dpop_validator: DPoPValidator = field(init=False) @@ -153,10 +156,12 @@ class EngineServiceAdapters: service_log: Callable[[str, str, Optional[str]], None] = field(init=False) device_auth_manager: DeviceAuthManager = field(init=False) github_integration: GitHubIntegration = field(init=False) + dev_mode_manager: DevModeManager = field(init=False) def __post_init__(self) -> None: self.db_conn_factory = _make_db_conn_factory(self.context.database_path) initialise_engine_database(self.context.database_path, logger=self.context.logger) + self.config = dict(self.context.config or {}) self.jwt_service = jwt_service_module.load_service() self.dpop_validator = DPoPValidator() self.ip_rate_limiter = SlidingWindowRateLimiter() @@ -167,7 +172,7 @@ class EngineServiceAdapters: except Exception: self.script_signer = None - log_file = str(self.context.config.get("log_file") or self.context.config.get("LOG_FILE") or "") + log_file = str(self.config.get("log_file") or self.config.get("LOG_FILE") or "") if log_file: base = Path(log_file).resolve().parent else: @@ -183,7 +188,7 @@ class EngineServiceAdapters: rate_limiter=self.device_rate_limiter, ) - config = self.context.config or {} + config = self.config cache_root_value = config.get("cache_dir") or config.get("CACHE_DIR") if cache_root_value: cache_root = Path(str(cache_root_value)) @@ -209,6 +214,25 @@ class EngineServiceAdapters: default_ttl_seconds=default_ttl_seconds, ) + env_ttl_raw = os.environ.get("BOREALIS_DEV_MODE_TTL_SECONDS") + try: + env_ttl = int(env_ttl_raw) if env_ttl_raw else None + except (TypeError, ValueError): + env_ttl = None + config_ttl_raw = config.get("assemblies_dev_mode_ttl_seconds") + try: + config_ttl = int(config_ttl_raw) if config_ttl_raw is not None else None + except (TypeError, ValueError): + config_ttl = None + + default_ttl = config_ttl or env_ttl or 900 + if default_ttl < 60: + default_ttl = 60 + self.dev_mode_manager = DevModeManager( + logger=self.context.logger, + default_ttl_seconds=default_ttl, + ) + def _register_tokens(app: Flask, adapters: EngineServiceAdapters) -> None: token_routes.register( diff --git a/Data/Engine/services/API/assemblies/management.py b/Data/Engine/services/API/assemblies/management.py index 28ccf9cd..8225a886 100644 --- a/Data/Engine/services/API/assemblies/management.py +++ b/Data/Engine/services/API/assemblies/management.py @@ -19,14 +19,13 @@ from __future__ import annotations import logging -import os from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple -from flask import Blueprint, jsonify, request, session -from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer +from flask import Blueprint, jsonify, request from ....assembly_management.models import AssemblyDomain from ...assemblies.service import AssemblyRuntimeService +from ...auth import RequestAuthContext if TYPE_CHECKING: # pragma: no cover - typing aide from .. import EngineServiceAdapters @@ -43,81 +42,104 @@ class AssemblyAPIService: if cache is None: raise RuntimeError("Assembly cache not initialised; ensure Engine bootstrap executed.") self.runtime = AssemblyRuntimeService(cache, logger=self.logger) + self.service_log = adapters.service_log + self.auth = RequestAuthContext( + app=app, + dev_mode_manager=adapters.dev_mode_manager, + config=adapters.config, + logger=self.logger, + ) # ------------------------------------------------------------------ # Authentication helpers # ------------------------------------------------------------------ def require_user(self) -> Tuple[Optional[Dict[str, Any]], Optional[Tuple[Dict[str, Any], int]]]: - user = self._current_user() - if not user: - return None, ({"error": "unauthorized"}, 401) - return user, None + return self.auth.require_user() - def require_admin(self, *, dev_mode_required: bool = False) -> Optional[Tuple[Dict[str, Any], int]]: - user = self._current_user() - if not user: - return {"error": "unauthorized"}, 401 - if not self._is_admin(user): - return {"error": "admin required"}, 403 - if dev_mode_required and not self._dev_mode_enabled(): - return {"error": "dev mode required"}, 403 - return None + def require_admin( + self, + *, + dev_mode_required: bool = False, + user: Optional[Dict[str, Any]] = None, + ) -> Tuple[Optional[Dict[str, Any]], Optional[Tuple[Dict[str, Any], int]]]: + actor = user + if actor is None: + actor, error = self.require_user() + if error: + detail = error[0].get("message") or error[0].get("error") or "authentication required" + self._audit(user=None, action="admin_check", status="denied", detail=detail) + return actor, error - def require_mutation_for_domain(self, domain: AssemblyDomain) -> Optional[Tuple[Dict[str, Any], int]]: + if not RequestAuthContext.is_admin(actor): + payload = { + "error": "forbidden", + "message": "Administrator permissions are required for this action.", + } + self._audit(user=actor, action="admin_check", status="denied", detail=payload["message"]) + return actor, (payload, 403) + + if dev_mode_required and not self.auth.dev_mode_enabled(user=actor): + payload = { + "error": "dev_mode_required", + "message": "Enable Dev Mode from the Assemblies admin controls to continue.", + } + self._audit(user=actor, action="dev_mode_check", status="denied", detail=payload["message"]) + return actor, (payload, 403) + + return actor, None + + def require_mutation_for_domain( + self, + domain: AssemblyDomain, + ) -> Tuple[Optional[Dict[str, Any]], Optional[Tuple[Dict[str, Any], int]]]: user, error = self.require_user() if error: - return error + detail = error[0].get("message") or error[0].get("error") or "authentication required" + self._audit(user=None, action="mutation_check", domain=domain, status="denied", detail=detail) + return user, error if domain == AssemblyDomain.USER: - return None - if not self._is_admin(user): - return {"error": "admin required for non-user domains"}, 403 - if not self._dev_mode_enabled(): - return {"error": "dev mode required for privileged domains"}, 403 - return None + return user, None + _, admin_error = self.require_admin(dev_mode_required=True, user=user) + if admin_error: + return user, admin_error + return user, None - def _token_serializer(self) -> URLSafeTimedSerializer: - secret = self.app.secret_key or "borealis-dev-secret" - return URLSafeTimedSerializer(secret, salt="borealis-auth") - - def _current_user(self) -> Optional[Dict[str, Any]]: - username = session.get("username") - role = session.get("role") or "User" - if username: - return {"username": username, "role": role} - - token = self._bearer_token() - if not token: - return None - max_age = int(os.environ.get("BOREALIS_TOKEN_TTL_SECONDS", 60 * 60 * 24 * 30)) + # ------------------------------------------------------------------ + # Audit helpers + # ------------------------------------------------------------------ + def _audit( + self, + *, + user: Optional[Dict[str, Any]], + action: str, + domain: Optional[AssemblyDomain] = None, + assembly_guid: Optional[str] = None, + status: str = "success", + detail: Optional[str] = None, + ) -> None: + actor = user or {} + username = (actor.get("username") or "").strip() or "anonymous" + role = (actor.get("role") or "").strip() or "unknown" + domain_value = domain.value if isinstance(domain, AssemblyDomain) else (domain or "n/a") + dev_mode_flag = self.auth.dev_mode_enabled(user=user) if user else False + parts = [ + f"user={username}", + f"role={role}", + f"action={action}", + f"domain={domain_value}", + f"status={status}", + f"dev_mode={'true' if dev_mode_flag else 'false'}", + ] + if assembly_guid: + parts.append(f"assembly={assembly_guid}") + if detail: + parts.append(f"detail={detail}") + message = " ".join(parts) + self.logger.info("Assemblies audit - %s", message) try: - data = self._token_serializer().loads(token, max_age=max_age) - username = data.get("u") - role = data.get("r") or "User" - if username: - return {"username": username, "role": role} - except (BadSignature, SignatureExpired, Exception): - return None - return None - - def _bearer_token(self) -> Optional[str]: - auth_header = request.headers.get("Authorization") or "" - if auth_header.lower().startswith("bearer "): - return auth_header.split(" ", 1)[1].strip() - cookie_token = request.cookies.get("borealis_auth") - if cookie_token: - return cookie_token - return None - - @staticmethod - def _is_admin(user: Dict[str, Any]) -> bool: - role = (user.get("role") or "").strip().lower() - return role == "admin" - - def _dev_mode_enabled(self) -> bool: - return bool(session.get("assemblies_dev_mode", False)) - - def set_dev_mode(self, enabled: bool) -> None: - session["assemblies_dev_mode"] = bool(enabled) + self.service_log("assemblies", message, scope="ADMIN") + except Exception: # pragma: no cover - logging safeguard + self.logger.debug("Failed to write assemblies service log entry.", exc_info=True) # ------------------------------------------------------------------ # Domain helpers @@ -177,16 +199,43 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: domain = service.parse_domain(payload.get("domain")) if domain is None: return jsonify({"error": "invalid domain"}), 400 - error = service.require_mutation_for_domain(domain) + user, error = service.require_mutation_for_domain(domain) + pending_guid = str(payload.get("assembly_guid") or "").strip() or None if error: + detail = error[0].get("message") or error[0].get("error") or "permission denied" + service._audit(user=user, action="create", domain=domain, assembly_guid=pending_guid, status="denied", detail=detail) return jsonify(error[0]), error[1] try: record = service.runtime.create_assembly(payload) + service._audit( + user=user, + action="create", + domain=domain, + assembly_guid=record.get("assembly_guid"), + status="success", + detail="queued", + ) return jsonify(record), 201 except ValueError as exc: + service._audit( + user=user, + action="create", + domain=domain, + assembly_guid=pending_guid, + status="failed", + detail=str(exc), + ) return jsonify({"error": str(exc)}), 400 except Exception: # pragma: no cover - runtime guard service.logger.exception("Failed to create assembly.") + service._audit( + user=user, + action="create", + domain=domain, + assembly_guid=pending_guid, + status="error", + detail="internal server error", + ) return jsonify({"error": "internal server error"}), 500 # ------------------------------------------------------------------ @@ -198,16 +247,49 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: existing = service.runtime.get_cached_entry(assembly_guid) if not existing: return jsonify({"error": "not found"}), 404 - error = service.require_mutation_for_domain(existing.domain) + user, error = service.require_mutation_for_domain(existing.domain) if error: + detail = error[0].get("message") or error[0].get("error") or "permission denied" + service._audit( + user=user, + action="update", + domain=existing.domain, + assembly_guid=assembly_guid, + status="denied", + detail=detail, + ) return jsonify(error[0]), error[1] try: record = service.runtime.update_assembly(assembly_guid, payload) + service._audit( + user=user, + action="update", + domain=existing.domain, + assembly_guid=assembly_guid, + status="success", + detail="queued", + ) return jsonify(record), 200 except ValueError as exc: + service._audit( + user=user, + action="update", + domain=existing.domain, + assembly_guid=assembly_guid, + status="failed", + detail=str(exc), + ) return jsonify({"error": str(exc)}), 400 except Exception: # pragma: no cover - runtime guard - service.logger.exception("Failed to update assembly %s.", assembly_id) + service.logger.exception("Failed to update assembly %s.", assembly_guid) + service._audit( + user=user, + action="update", + domain=existing.domain, + assembly_guid=assembly_guid, + status="error", + detail="internal server error", + ) return jsonify({"error": "internal server error"}), 500 # ------------------------------------------------------------------ @@ -218,16 +300,49 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: existing = service.runtime.get_cached_entry(assembly_guid) if not existing: return jsonify({"error": "not found"}), 404 - error = service.require_mutation_for_domain(existing.domain) + user, error = service.require_mutation_for_domain(existing.domain) if error: + detail = error[0].get("message") or error[0].get("error") or "permission denied" + service._audit( + user=user, + action="delete", + domain=existing.domain, + assembly_guid=assembly_guid, + status="denied", + detail=detail, + ) return jsonify(error[0]), error[1] try: service.runtime.delete_assembly(assembly_guid) + service._audit( + user=user, + action="delete", + domain=existing.domain, + assembly_guid=assembly_guid, + status="success", + detail="queued", + ) return jsonify({"status": "queued"}), 202 except ValueError as exc: + service._audit( + user=user, + action="delete", + domain=existing.domain, + assembly_guid=assembly_guid, + status="failed", + detail=str(exc), + ) return jsonify({"error": str(exc)}), 400 except Exception: # pragma: no cover - service.logger.exception("Failed to delete assembly %s.", assembly_id) + service.logger.exception("Failed to delete assembly %s.", assembly_guid) + service._audit( + user=user, + action="delete", + domain=existing.domain, + assembly_guid=assembly_guid, + status="error", + detail="internal server error", + ) return jsonify({"error": "internal server error"}), 500 # ------------------------------------------------------------------ @@ -240,8 +355,18 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: domain = service.parse_domain(target_domain_value) if domain is None: return jsonify({"error": "invalid target domain"}), 400 - error = service.require_mutation_for_domain(domain) + user, error = service.require_mutation_for_domain(domain) + pending_guid = str(payload.get("new_assembly_guid") or "").strip() or None if error: + detail = error[0].get("message") or error[0].get("error") or "permission denied" + service._audit( + user=user, + action="clone", + domain=domain, + assembly_guid=pending_guid or assembly_guid, + status="denied", + detail=detail, + ) return jsonify(error[0]), error[1] new_guid = payload.get("new_assembly_guid") try: @@ -250,11 +375,35 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: target_domain=domain.value, new_assembly_guid=new_guid, ) + service._audit( + user=user, + action="clone", + domain=domain, + assembly_guid=record.get("assembly_guid"), + status="success", + detail=f"source={assembly_guid}", + ) return jsonify(record), 201 except ValueError as exc: + service._audit( + user=user, + action="clone", + domain=domain, + assembly_guid=pending_guid or assembly_guid, + status="failed", + detail=str(exc), + ) return jsonify({"error": str(exc)}), 400 except Exception: # pragma: no cover - service.logger.exception("Failed to clone assembly %s.", assembly_id) + service.logger.exception("Failed to clone assembly %s.", assembly_guid) + service._audit( + user=user, + action="clone", + domain=domain, + assembly_guid=pending_guid or assembly_guid, + status="error", + detail="internal server error", + ) return jsonify({"error": "internal server error"}), 500 # ------------------------------------------------------------------ @@ -262,27 +411,35 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: # ------------------------------------------------------------------ @blueprint.route("/dev-mode/switch", methods=["POST"]) def switch_dev_mode(): - error = service.require_admin() + user, error = service.require_admin() if error: return jsonify(error[0]), error[1] payload = request.get_json(silent=True) or {} enabled = bool(payload.get("enabled")) - service.set_dev_mode(enabled) - return jsonify({"dev_mode": service._dev_mode_enabled()}), 200 + try: + final_state = service.auth.set_dev_mode(enabled) + except PermissionError: # pragma: no cover - defensive + service._audit(user=user, action="dev_mode_toggle", status="error", detail="state persistence failure") + return jsonify({"error": "unable to update dev mode state"}), 500 + detail = "enabled" if final_state else "disabled" + service._audit(user=user, action="dev_mode_toggle", status="success", detail=detail) + return jsonify({"dev_mode": final_state}), 200 # ------------------------------------------------------------------ # Immediate flush # ------------------------------------------------------------------ @blueprint.route("/dev-mode/write", methods=["POST"]) def flush_assemblies(): - error = service.require_admin(dev_mode_required=True) + user, error = service.require_admin(dev_mode_required=True) if error: return jsonify(error[0]), error[1] try: service.runtime.flush_writes() + service._audit(user=user, action="flush_queue", status="success", detail="manual flush") return jsonify({"status": "flushed"}), 200 except Exception: # pragma: no cover service.logger.exception("Failed to flush assembly queue.") + service._audit(user=user, action="flush_queue", status="error", detail="internal server error") return jsonify({"error": "internal server error"}), 500 # ------------------------------------------------------------------ @@ -290,14 +447,16 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: # ------------------------------------------------------------------ @blueprint.route("/official/sync", methods=["POST"]) def sync_official(): - error = service.require_admin(dev_mode_required=True) + user, error = service.require_admin(dev_mode_required=True) if error: return jsonify(error[0]), error[1] try: service.runtime.sync_official() + service._audit(user=user, action="sync_official", status="success") return jsonify({"status": "synced"}), 200 except Exception: # pragma: no cover service.logger.exception("Official assembly sync failed.") + service._audit(user=user, action="sync_official", status="error", detail="internal server error") return jsonify({"error": "internal server error"}), 500 app.register_blueprint(blueprint) diff --git a/Data/Engine/services/auth/__init__.py b/Data/Engine/services/auth/__init__.py new file mode 100644 index 00000000..d0c7265a --- /dev/null +++ b/Data/Engine/services/auth/__init__.py @@ -0,0 +1,14 @@ +# ====================================================== +# Data\Engine\services\auth\__init__.py +# Description: Exposes shared authentication helpers for Engine REST services. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Authentication utilities for Borealis Engine services.""" + +from .context import RequestAuthContext, PermissionResult +from .dev_mode import DevModeEntry, DevModeManager + +__all__ = ["RequestAuthContext", "PermissionResult", "DevModeEntry", "DevModeManager"] + diff --git a/Data/Engine/services/auth/context.py b/Data/Engine/services/auth/context.py new file mode 100644 index 00000000..16af9a57 --- /dev/null +++ b/Data/Engine/services/auth/context.py @@ -0,0 +1,199 @@ +# ====================================================== +# Data\Engine\services\auth\context.py +# Description: Provides request-scoped authentication helpers with Dev Mode state integration. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Shared authentication helpers for Engine REST services.""" + +from __future__ import annotations + +import logging +import os +import uuid +from typing import Any, Dict, Mapping, Optional, Tuple + +from flask import Request, request, session +from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer + +from .dev_mode import DevModeManager + +PermissionResult = Tuple[Dict[str, Any], int] + + +def _coerce_positive_int(value: Any, default: int) -> int: + try: + candidate = int(value) + if candidate > 0: + return candidate + except (TypeError, ValueError): + pass + return default + + +class RequestAuthContext: + """Resolves the current operator and Dev Mode state for the active request.""" + + def __init__( + self, + *, + app, + dev_mode_manager: DevModeManager, + config: Optional[Mapping[str, Any]] = None, + logger: Optional[logging.Logger] = None, + ) -> None: + self._app = app + self._dev_mode_manager = dev_mode_manager + self._config = config or {} + self._logger = logger or logging.getLogger(__name__) + secret = app.secret_key or "borealis-dev-secret" + self._serializer = URLSafeTimedSerializer(secret, salt="borealis-auth") + default_token_ttl = int(os.environ.get("BOREALIS_TOKEN_TTL_SECONDS", 60 * 60 * 24 * 30)) + self._token_ttl = _coerce_positive_int( + self._config.get("auth_token_ttl_seconds"), default_token_ttl + ) + default_dev_mode_ttl = int(os.environ.get("BOREALIS_DEV_MODE_TTL_SECONDS", 900)) + self._dev_mode_ttl = _coerce_positive_int( + self._config.get("assemblies_dev_mode_ttl_seconds"), default_dev_mode_ttl + ) + + # ------------------------------------------------------------------ + # Session helpers + # ------------------------------------------------------------------ + def ensure_session_token(self) -> str: + """Ensure an opaque session token exists for Dev Mode tracking.""" + + token = session.get("dev_mode_session") + if not token: + token = uuid.uuid4().hex + session["dev_mode_session"] = token + session.modified = True + return token + + def current_user(self) -> Optional[Dict[str, Any]]: + """Return the authenticated operator profile, if any.""" + + username = session.get("username") + role = session.get("role") or "User" + if username: + return {"username": username, "role": role} + + token = self._bearer_token(request) + if not token: + return None + try: + data = self._serializer.loads(token, max_age=self._token_ttl) + except (BadSignature, SignatureExpired): + self._logger.debug("Bearer token invalid or expired.") + return None + except Exception: # pragma: no cover - defensive logging + self._logger.debug("Unexpected error validating bearer token.", exc_info=True) + return None + + username = (data.get("u") or "").strip() + role = (data.get("r") or "User").strip() or "User" + if not username: + return None + return {"username": username, "role": role} + + def dev_mode_enabled(self, *, user: Optional[Dict[str, Any]] = None) -> bool: + """Return whether Dev Mode is currently active for the operator.""" + + profile = user or self.current_user() + if not profile: + return False + token = self.ensure_session_token() + return self._dev_mode_manager.is_enabled(username=profile["username"], session_token=token) + + def set_dev_mode(self, enabled: bool, *, ttl_seconds: Optional[int] = None) -> bool: + """Enable or disable Dev Mode for the active operator session.""" + + profile = self.current_user() + if not profile: + raise PermissionError("Authentication required to modify Dev Mode state.") # type: ignore[return-value] + token = self.ensure_session_token() + if enabled: + ttl = ttl_seconds if ttl_seconds and ttl_seconds > 0 else self._dev_mode_ttl + self._dev_mode_manager.enable( + username=profile["username"], + role=profile.get("role") or "User", + session_token=token, + ttl_seconds=ttl, + ) + else: + self._dev_mode_manager.disable(username=profile["username"], session_token=token) + return self.dev_mode_enabled(user=profile) + + def clear_dev_mode(self) -> None: + """Explicitly disable Dev Mode for the active operator.""" + + profile = self.current_user() + if not profile: + return + token = self.ensure_session_token() + self._dev_mode_manager.disable(username=profile["username"], session_token=token) + + # ------------------------------------------------------------------ + # Permission helpers + # ------------------------------------------------------------------ + def require_user(self) -> Tuple[Optional[Dict[str, Any]], Optional[PermissionResult]]: + user = self.current_user() + if not user: + return None, ( + { + "error": "unauthorized", + "message": "Authentication required. Please sign in and retry.", + }, + 401, + ) + return user, None + + def require_admin(self, *, dev_mode_required: bool = False) -> Optional[PermissionResult]: + user = self.current_user() + if not user: + return ( + { + "error": "unauthorized", + "message": "Authentication required. Please sign in and retry.", + }, + 401, + ) + if not self.is_admin(user): + return ( + { + "error": "forbidden", + "message": "Administrator permissions are required for this action.", + }, + 403, + ) + if dev_mode_required and not self.dev_mode_enabled(user=user): + return ( + { + "error": "dev_mode_required", + "message": "Enable Dev Mode from the Assemblies admin controls to continue.", + }, + 403, + ) + return None + + # ------------------------------------------------------------------ + # Static helpers + # ------------------------------------------------------------------ + @staticmethod + def is_admin(user: Dict[str, Any]) -> bool: + role = (user.get("role") or "").strip().lower() + return role == "admin" + + @staticmethod + def _bearer_token(req: Request) -> Optional[str]: + header = (req.headers.get("Authorization") or "").strip() + if header.lower().startswith("bearer "): + return header.split(" ", 1)[1].strip() + cookie_token = req.cookies.get("borealis_auth") + if cookie_token: + return cookie_token + return None + + +__all__ = ["RequestAuthContext", "PermissionResult"] diff --git a/Data/Engine/services/auth/dev_mode.py b/Data/Engine/services/auth/dev_mode.py new file mode 100644 index 00000000..e1fdd84d --- /dev/null +++ b/Data/Engine/services/auth/dev_mode.py @@ -0,0 +1,187 @@ +# ====================================================== +# Data\Engine\services\auth\dev_mode.py +# Description: Manages in-memory Dev Mode state for operator sessions with TTL enforcement. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Server-side Dev Mode state tracking for operator sessions.""" + +from __future__ import annotations + +import datetime as _dt +import logging +import threading +from dataclasses import dataclass +from typing import Dict, Optional, Tuple + + +def _utcnow() -> _dt.datetime: + return _dt.datetime.utcnow().replace(microsecond=0) + + +def _normalize_username(value: str) -> str: + return (value or "").strip().lower() + + +@dataclass(frozen=True) +class DevModeEntry: + """Represents a Dev Mode grant for an operator.""" + + username: str + role: str + session_token: str + enabled_at: _dt.datetime + expires_at: _dt.datetime + + def is_expired(self, *, now: Optional[_dt.datetime] = None) -> bool: + reference = now or _utcnow() + return reference >= self.expires_at + + +class DevModeManager: + """Tracks Dev Mode enablement for operator sessions.""" + + def __init__(self, *, logger: Optional[logging.Logger] = None, default_ttl_seconds: int = 900) -> None: + ttl = int(default_ttl_seconds or 0) + if ttl < 60: + ttl = 60 + self._default_ttl = ttl + self._logger = logger or logging.getLogger(__name__) + self._entries: Dict[Tuple[str, str], DevModeEntry] = {} + self._lock = threading.RLock() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def enable( + self, + *, + username: str, + role: str, + session_token: str, + ttl_seconds: Optional[int] = None, + ) -> DevModeEntry: + """Enable Dev Mode for the specified operator session.""" + + normalized = _normalize_username(username) + if not normalized: + raise ValueError("username required for Dev Mode enablement") + token = (session_token or "").strip() + if not token: + raise ValueError("session token required for Dev Mode enablement") + + ttl = ttl_seconds if ttl_seconds and ttl_seconds > 0 else self._default_ttl + if ttl < 60: + ttl = 60 + now = _utcnow() + entry = DevModeEntry( + username=normalized, + role=(role or "User").strip() or "User", + session_token=token, + enabled_at=now, + expires_at=now + _dt.timedelta(seconds=ttl), + ) + + with self._lock: + self._prune_locked(now=now) + self._entries[(normalized, token)] = entry + + self._logger.info( + "Dev Mode enabled for user='%s' role='%s' session_suffix='%s' ttl_seconds=%s", + normalized, + entry.role, + token[-6:], + ttl, + ) + return entry + + def disable(self, *, username: str, session_token: str) -> bool: + """Disable Dev Mode for the specified operator session.""" + + normalized = _normalize_username(username) + token = (session_token or "").strip() + if not normalized or not token: + return False + + removed = False + with self._lock: + removed = self._entries.pop((normalized, token), None) is not None + + if removed: + self._logger.info( + "Dev Mode disabled for user='%s' session_suffix='%s'", + normalized, + token[-6:], + ) + return removed + + def is_enabled(self, *, username: str, session_token: str) -> bool: + """Return whether Dev Mode is currently enabled for the operator session.""" + + normalized = _normalize_username(username) + token = (session_token or "").strip() + if not normalized or not token: + return False + + now = _utcnow() + with self._lock: + entry = self._entries.get((normalized, token)) + if not entry: + self._prune_locked(now=now) + return False + + if entry.is_expired(now=now): + self._entries.pop((normalized, token), None) + self._logger.info( + "Dev Mode expired for user='%s' session_suffix='%s'", + normalized, + token[-6:], + ) + return False + + return True + + def describe(self) -> Dict[str, Dict[str, str]]: + """Return a serialisable snapshot of active Dev Mode entries.""" + + snapshot: Dict[str, Dict[str, str]] = {} + now = _utcnow() + with self._lock: + self._prune_locked(now=now) + for entry in self._entries.values(): + key = f"{entry.username}:{entry.session_token[-6:]}" + snapshot[key] = { + "username": entry.username, + "role": entry.role, + "enabled_at": entry.enabled_at.isoformat(), + "expires_at": entry.expires_at.isoformat(), + } + return snapshot + + def clean_expired(self) -> None: + """Remove expired Dev Mode entries.""" + + with self._lock: + self._prune_locked(now=_utcnow()) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _prune_locked(self, *, now: Optional[_dt.datetime] = None) -> None: + reference = now or _utcnow() + expired_keys = [ + key for key, entry in self._entries.items() if entry.is_expired(now=reference) + ] + for key in expired_keys: + entry = self._entries.pop(key, None) + if entry: + self._logger.info( + "Dev Mode expired for user='%s' session_suffix='%s' (prune)", + entry.username, + entry.session_token[-6:], + ) + + +__all__ = ["DevModeManager", "DevModeEntry"] +