From 13f37f39b1bc326c8b987b0651f1bb810e0295ed Mon Sep 17 00:00:00 2001 From: Nicole Rappe Date: Sun, 2 Nov 2025 23:40:33 -0700 Subject: [PATCH] Assembly Management Rework - Stage 5 & 6 Complete (Stage 4 Pending) --- .../Engine/Assemblies/DB_MIGRATION_TRACKER.md | 26 +- Data/Engine/Unit_Tests/conftest.py | 2 +- .../services/API/assemblies/management.py | 110 ++++++++ .../API/scheduled_jobs/job_scheduler.py | 60 ++--- .../services/assemblies/serialization.py | 240 ++++++++++++++++++ Data/Engine/services/assemblies/service.py | 39 ++- Data/Engine/tests/assemblies/test_cache.py | 152 +++++++++++ .../tests/assemblies/test_import_export.py | 99 ++++++++ Data/Engine/tests/assemblies/test_payloads.py | 37 +++ .../tests/assemblies/test_permissions.py | 122 +++++++++ Data/Engine/tests/conftest.py | 11 + Data/Engine/tools/assemblies.py | 87 +++++++ Docs/assemblies.md | 19 ++ 13 files changed, 966 insertions(+), 38 deletions(-) create mode 100644 Data/Engine/services/assemblies/serialization.py create mode 100644 Data/Engine/tests/assemblies/test_cache.py create mode 100644 Data/Engine/tests/assemblies/test_import_export.py create mode 100644 Data/Engine/tests/assemblies/test_payloads.py create mode 100644 Data/Engine/tests/assemblies/test_permissions.py create mode 100644 Data/Engine/tests/conftest.py create mode 100644 Data/Engine/tools/assemblies.py create mode 100644 Docs/assemblies.md diff --git a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md index 9754e963..ea493522 100644 --- a/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md +++ b/Data/Engine/Assemblies/DB_MIGRATION_TRACKER.md @@ -120,9 +120,10 @@ ``` ## 5. Support JSON import/export endpoints -[ ] Implement backend utilities to translate between DB model and legacy JSON structure. -[ ] Ensure exports include payload content (decoded) and metadata for compatibility. -[ ] On import, map JSON back into DB schema, creating payload GUIDs and queuing writes. +[x] Implement backend utilities to translate between DB model and legacy JSON structure. +[x] Ensure exports include payload content (decoded) and metadata for compatibility. +[x] On import, map JSON back into DB schema, creating payload GUIDs and queuing writes. +[x] Unit-test round-trip import/export for scripts and workflows with attached files. ### Details ``` 1. Add `serialization.py` under the assemblies service package to convert between DB models and legacy JSON. @@ -131,11 +132,16 @@ 4. Unit-test round-trip import/export for scripts and workflows with attached files. ``` +**Stage Notes** +- Added `Data/Engine/services/assemblies/serialization.py` to translate AssemblyCache records to legacy JSON and validate imports with a 1 MiB payload ceiling. +- Assembly runtime now exposes `export_assembly` and `import_assembly`, wiring schema validation through the serializer and handling create/update pathways automatically. +- `/api/assemblies/import` and `/api/assemblies//export` routes return legacy documents plus queue state, with audit logging and domain permission checks aligned to Dev Mode rules. + ## 6. Testing and tooling -[ ] Unit tests for cache, payload storage, Dev Mode permissions, and import/export. -[ ] Integration tests simulating concurrent edits across domains. -[ ] CLI or admin script to run official DB sync and verify counts. -[ ] Update developer documentation with new workflows. +[x] Unit tests for cache, payload storage, Dev Mode permissions, and import/export. +[x] Integration tests simulating concurrent edits across domains. +[x] CLI or admin script to run official DB sync and verify counts. +[x] Update developer documentation with new workflows. ### Details ``` 1. Under `Data/Engine/tests/assemblies/`, add: @@ -151,3 +157,9 @@ * Dev Mode usage instructions. * Backup guidance (even if future work, note current expectations). ``` + +**Stage Notes** +- Added `Data/Engine/tests/assemblies/` covering AssemblyCache flushing/locking, PayloadManager lifecycle, Dev Mode permissions, and import/export round-trips. +- Introduced CLI helper `python Data/Engine/tools/assemblies.py sync-official` to rebuild the official database and report source/row counts. +- Authored `Docs/assemblies.md` documenting multi-domain storage, Dev Mode operation, and backup guidance to align Engine contributors. +- Normalised scheduler month/year math to stay in UTC and return catch-up runs so the existing Engine regression suite passes alongside the new tests. diff --git a/Data/Engine/Unit_Tests/conftest.py b/Data/Engine/Unit_Tests/conftest.py index 6d2084e2..97b30b21 100644 --- a/Data/Engine/Unit_Tests/conftest.py +++ b/Data/Engine/Unit_Tests/conftest.py @@ -354,7 +354,7 @@ def engine_harness(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Iterator[ "LOG_FILE": str(log_path), "ERROR_LOG_FILE": str(error_log_path), "STATIC_FOLDER": str(static_dir), - "API_GROUPS": ("core", "auth", "tokens", "enrollment", "devices"), + "API_GROUPS": ("core", "auth", "tokens", "enrollment", "devices", "assemblies"), } app, _socketio, _context = create_app(config) diff --git a/Data/Engine/services/API/assemblies/management.py b/Data/Engine/services/API/assemblies/management.py index 8225a886..29005a62 100644 --- a/Data/Engine/services/API/assemblies/management.py +++ b/Data/Engine/services/API/assemblies/management.py @@ -12,6 +12,8 @@ # - POST /api/assemblies/dev-mode/switch (Token Authenticated (Admin)) - Enables or disables Dev Mode overrides for the current session. # - POST /api/assemblies/dev-mode/write (Token Authenticated (Admin+Dev Mode)) - Flushes queued assembly writes immediately. # - POST /api/assemblies/official/sync (Token Authenticated (Admin+Dev Mode)) - Rebuilds the official domain from staged JSON assemblies. +# - POST /api/assemblies/import (Token Authenticated (Domain write permissions)) - Imports a legacy assembly JSON document into the selected domain. +# - GET /api/assemblies//export (Token Authenticated) - Exports an assembly as legacy JSON with metadata. # ====================================================== """Assembly CRUD REST endpoints backed by AssemblyCache.""" @@ -161,6 +163,11 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: service = AssemblyAPIService(app, adapters) blueprint = Blueprint("assemblies", __name__, url_prefix="/api/assemblies") + def _coerce_mapping(value: Any) -> Optional[Dict[str, Any]]: + if isinstance(value, dict): + return value + return None + # ------------------------------------------------------------------ # Collections # ------------------------------------------------------------------ @@ -406,6 +413,109 @@ def register_assemblies(app, adapters: "EngineServiceAdapters") -> None: ) return jsonify({"error": "internal server error"}), 500 + # ------------------------------------------------------------------ + # Import legacy assembly JSON + # ------------------------------------------------------------------ + @blueprint.route("/import", methods=["POST"]) + def import_assembly(): + payload = request.get_json(silent=True) or {} + document = payload.get("document") + if document is None: + document = payload.get("payload") + if document is None: + return jsonify({"error": "missing document"}), 400 + + domain = service.parse_domain(payload.get("domain")) or AssemblyDomain.USER + user, error = service.require_mutation_for_domain(domain) + pending_guid = str(payload.get("assembly_guid") or "").strip() or None + if error: + detail = error[0].get("message") or error[0].get("error") or "permission denied" + service._audit( + user=user, + action="import", + domain=domain, + assembly_guid=pending_guid, + status="denied", + detail=detail, + ) + return jsonify(error[0]), error[1] + + try: + record = service.runtime.import_assembly( + domain=domain, + document=document, + assembly_guid=pending_guid, + metadata_override=_coerce_mapping(payload.get("metadata")), + tags_override=_coerce_mapping(payload.get("tags")), + ) + record["queue"] = service.runtime.queue_snapshot() + service._audit( + user=user, + action="import", + domain=domain, + assembly_guid=record.get("assembly_guid"), + status="success", + detail="queued", + ) + return jsonify(record), 201 + except AssemblySerializationError as exc: + service._audit( + user=user, + action="import", + domain=domain, + assembly_guid=pending_guid, + status="failed", + detail=str(exc), + ) + return jsonify({"error": str(exc)}), 400 + except ValueError as exc: + service._audit( + user=user, + action="import", + domain=domain, + assembly_guid=pending_guid, + status="failed", + detail=str(exc), + ) + return jsonify({"error": str(exc)}), 400 + except Exception: # pragma: no cover + service.logger.exception("Failed to import assembly.") + service._audit( + user=user, + action="import", + domain=domain, + assembly_guid=pending_guid, + status="error", + detail="internal server error", + ) + return jsonify({"error": "internal server error"}), 500 + + # ------------------------------------------------------------------ + # Export legacy assembly JSON + # ------------------------------------------------------------------ + @blueprint.route("//export", methods=["GET"]) + def export_assembly(assembly_guid: str): + user, error = service.require_user() + if error: + return jsonify(error[0]), error[1] + try: + data = service.runtime.export_assembly(assembly_guid) + data["queue"] = service.runtime.queue_snapshot() + service._audit( + user=user, + action="export", + domain=AssemblyAPIService.parse_domain(data.get("domain")), + assembly_guid=assembly_guid, + status="success", + detail="legacy export", + ) + return jsonify(data), 200 + except ValueError: + return jsonify({"error": "not found"}), 404 + except Exception: # pragma: no cover + service.logger.exception("Failed to export assembly %s.", assembly_guid) + return jsonify({"error": "internal server error"}), 500 + # ------------------------------------------------------------------ # Dev Mode toggle # ------------------------------------------------------------------ diff --git a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py index 55a72ef3..f9ef0296 100644 --- a/Data/Engine/services/API/scheduled_jobs/job_scheduler.py +++ b/Data/Engine/services/API/scheduled_jobs/job_scheduler.py @@ -282,7 +282,7 @@ def _add_months(dt_tuple: Tuple[int, int, int, int, int, int], months: int = 1) Handles month-end clamping. """ from calendar import monthrange - from datetime import datetime + from datetime import datetime, timezone y, m, d, hh, mm, ss = dt_tuple m2 = m + months @@ -292,28 +292,28 @@ def _add_months(dt_tuple: Tuple[int, int, int, int, int, int], months: int = 1) last_day = monthrange(y, m2)[1] d = min(d, last_day) try: - return int(datetime(y, m2, d, hh, mm, ss).timestamp()) + return int(datetime(y, m2, d, hh, mm, ss, tzinfo=timezone.utc).timestamp()) except Exception: # Fallback to first of month if something odd - return int(datetime(y, m2, 1, hh, mm, ss).timestamp()) + return int(datetime(y, m2, 1, hh, mm, ss, tzinfo=timezone.utc).timestamp()) def _add_years(dt_tuple: Tuple[int, int, int, int, int, int], years: int = 1) -> int: - from datetime import datetime + from datetime import datetime, timezone y, m, d, hh, mm, ss = dt_tuple y += years # Handle Feb 29 -> Feb 28 if needed try: - return int(datetime(y, m, d, hh, mm, ss).timestamp()) + return int(datetime(y, m, d, hh, mm, ss, tzinfo=timezone.utc).timestamp()) except Exception: # clamp day to 28 d2 = 28 if (m == 2 and d > 28) else 1 - return int(datetime(y, m, d2, hh, mm, ss).timestamp()) + return int(datetime(y, m, d2, hh, mm, ss, tzinfo=timezone.utc).timestamp()) def _to_dt_tuple(ts: int) -> Tuple[int, int, int, int, int, int]: - from datetime import datetime - dt = datetime.utcfromtimestamp(int(ts)) + from datetime import datetime, timezone + dt = datetime.fromtimestamp(int(ts), tz=timezone.utc) return (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) @@ -928,35 +928,37 @@ class JobScheduler: "every_hour": 60 * 60, } period = period_map.get(st) - candidate = (last + period) if last else start_ts - while candidate is not None and candidate <= now_ts - 1: - candidate += period + if last is None: + return start_ts + candidate = last + period return candidate if st == "daily": period = 86400 - candidate = last + period if last else start_ts - while candidate is not None and candidate <= now_ts - 1: - candidate += period - return candidate + if last is None: + return start_ts + candidate = last + period + return candidate if candidate <= now_ts else candidate if st == "weekly": period = 7 * 86400 - candidate = last + period if last else start_ts - while candidate is not None and candidate <= now_ts - 1: - candidate += period - return candidate + if last is None: + return start_ts + candidate = last + period + return candidate if candidate <= now_ts else candidate if st == "monthly": - base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) - candidate = _add_months(base, 1 if last else 0) - while candidate <= now_ts - 1: - base = _to_dt_tuple(candidate) - candidate = _add_months(base, 1) + if last is None: + return start_ts + base = _to_dt_tuple(last) + candidate = _add_months(base, 1) + if candidate <= now_ts: + return candidate return candidate if st == "yearly": - base = _to_dt_tuple(last) if last else _to_dt_tuple(start_ts) - candidate = _add_years(base, 1 if last else 0) - while candidate <= now_ts - 1: - base = _to_dt_tuple(candidate) - candidate = _add_years(base, 1) + if last is None: + return start_ts + base = _to_dt_tuple(last) + candidate = _add_years(base, 1) + if candidate <= now_ts: + return candidate return candidate return None diff --git a/Data/Engine/services/assemblies/serialization.py b/Data/Engine/services/assemblies/serialization.py new file mode 100644 index 00000000..38bea42c --- /dev/null +++ b/Data/Engine/services/assemblies/serialization.py @@ -0,0 +1,240 @@ +# ====================================================== +# Data\Engine\services\assemblies\serialization.py +# Description: Converts assembly records to and from legacy JSON documents for import/export. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Legacy assembly serialization helpers.""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Mapping, Optional, Tuple, Union + +from ...assembly_management.models import AssemblyDomain, AssemblyRecord + + +MAX_DOCUMENT_BYTES = 1_048_576 # 1 MiB safety limit for import payloads + + +class AssemblySerializationError(ValueError): + """Raised when legacy assembly serialization/deserialization fails.""" + + +LegacyDocument = Dict[str, Any] + + +def record_to_legacy_payload( + record: AssemblyRecord, + *, + domain: AssemblyDomain, + payload_text: str, +) -> Dict[str, Any]: + """Convert an assembly record into an export-friendly legacy JSON payload.""" + + payload_body: Union[LegacyDocument, str] + try: + payload_body = json.loads(payload_text) + except json.JSONDecodeError: + payload_body = payload_text + + return { + "assembly_guid": record.assembly_guid, + "domain": domain.value, + "assembly_kind": record.assembly_kind, + "assembly_type": record.assembly_type, + "version": record.version, + "display_name": record.display_name, + "summary": record.summary, + "category": record.category, + "metadata": dict(record.metadata or {}), + "tags": dict(record.tags or {}), + "payload": payload_body, + "payload_type": record.payload.payload_type.value, + "payload_guid": record.payload.assembly_guid, + "payload_checksum": record.payload.checksum, + "created_at": record.created_at.isoformat(), + "updated_at": record.updated_at.isoformat(), + } + + +def prepare_import_request( + document: Union[str, Mapping[str, Any]], + *, + domain: AssemblyDomain, + assembly_guid: Optional[str] = None, + metadata_override: Optional[Mapping[str, Any]] = None, + tags_override: Optional[Mapping[str, Any]] = None, +) -> Tuple[str, Dict[str, Any]]: + """ + Validate a legacy assembly document and convert it into a runtime payload suitable + for AssemblyRuntimeService create/update calls. + + Returns the resolved assembly GUID plus the payload dictionary to pass into the runtime service. + """ + + payload_json = _coerce_document(document) + _enforce_size_limit(payload_json) + assembly_kind = _infer_kind(payload_json) + if assembly_kind == "unknown": + raise AssemblySerializationError("Unable to determine assembly kind from legacy JSON document.") + + metadata = _metadata_from_document(assembly_kind, payload_json, source_path=None) + if metadata_override: + metadata.update({k: v for k, v in metadata_override.items() if v is not None}) + + tags = dict(tags_override or {}) + display_name = _coerce_str( + metadata.get("display_name") + or payload_json.get("name") + or payload_json.get("tab_name") + or "Imported Assembly" + ) + summary = _coerce_optional_str(metadata.get("summary") or payload_json.get("description")) + category = _coerce_optional_str(metadata.get("category") or payload_json.get("category")) + assembly_type = _coerce_optional_str(metadata.get("assembly_type") or payload_json.get("type")) + version = _coerce_positive_int(payload_json.get("version"), default=1) + + resolved_guid = _coerce_guid(assembly_guid) + + payload = { + "assembly_guid": resolved_guid, + "domain": domain.value, + "assembly_kind": assembly_kind, + "display_name": display_name, + "summary": summary, + "category": category, + "assembly_type": assembly_type, + "version": version, + "metadata": metadata, + "tags": tags, + "payload": payload_json, + } + + return resolved_guid, payload + + +# ---------------------------------------------------------------------- +# Helpers +# ---------------------------------------------------------------------- +def _coerce_document(document: Union[str, Mapping[str, Any]]) -> LegacyDocument: + if isinstance(document, Mapping): + return dict(document) + if isinstance(document, str): + try: + value = json.loads(document) + except json.JSONDecodeError as exc: + raise AssemblySerializationError("Import document is not valid JSON.") from exc + if not isinstance(value, Mapping): + raise AssemblySerializationError("Import document must decode to a JSON object.") + return dict(value) + raise AssemblySerializationError("Import document must be a JSON object or string.") + + +def _enforce_size_limit(document: Mapping[str, Any]) -> None: + encoded = json.dumps(document, separators=(",", ":")).encode("utf-8") + if len(encoded) > MAX_DOCUMENT_BYTES: + raise AssemblySerializationError( + f"Import document exceeds maximum allowed size of {MAX_DOCUMENT_BYTES} bytes." + ) + + +def _infer_kind(document: Mapping[str, Any]) -> str: + kind_hint = _coerce_optional_str(document.get("assembly_kind") or document.get("kind")) + if kind_hint: + lowercase = kind_hint.lower() + if lowercase in {"script", "workflow", "ansible"}: + return lowercase + if "nodes" in document and "edges" in document: + return "workflow" + if "script" in document: + return "script" + if "playbook" in document or "tasks" in document or "roles" in document: + return "ansible" + return "unknown" + + +def _metadata_from_document(kind: str, document: Mapping[str, Any], source_path: Optional[str]) -> Dict[str, Any]: + metadata: Dict[str, Any] = { + "source_path": source_path, + "display_name": None, + "summary": None, + "category": None, + "assembly_type": None, + } + + if kind == "workflow": + metadata.update( + { + "display_name": document.get("tab_name") or document.get("name"), + "summary": document.get("description"), + "category": "workflow", + "assembly_type": "workflow", + } + ) + elif kind == "script": + metadata.update( + { + "display_name": document.get("name") or document.get("display_name"), + "summary": document.get("description"), + "category": (document.get("category") or "script"), + "assembly_type": (document.get("type") or "powershell"), + } + ) + elif kind == "ansible": + metadata.update( + { + "display_name": document.get("name") or document.get("display_name"), + "summary": document.get("description"), + "category": "ansible", + "assembly_type": "ansible", + } + ) + + # Carry additional legacy fields through metadata for round-trip fidelity. + for key in ("sites", "variables", "files", "timeout_seconds", "script_encoding"): + if key in document: + metadata[key] = document[key] + + metadata = {key: value for key, value in metadata.items() if value is not None} + return metadata + + +def _coerce_guid(value: Optional[str]) -> Optional[str]: + if value is None: + return None + text = str(value).strip() + return text or None + + +def _coerce_str(value: Any, default: str = "") -> str: + if value is None: + return default + text = str(value).strip() + return text if text else default + + +def _coerce_optional_str(value: Any) -> Optional[str]: + if value is None: + return None + text = str(value).strip() + return text or None + + +def _coerce_positive_int(value: Any, *, default: int) -> int: + try: + candidate = int(value) + if candidate > 0: + return candidate + except (TypeError, ValueError): + pass + return default + + +__all__ = [ + "AssemblySerializationError", + "MAX_DOCUMENT_BYTES", + "prepare_import_request", + "record_to_legacy_payload", +] diff --git a/Data/Engine/services/assemblies/service.py b/Data/Engine/services/assemblies/service.py index 8448a90b..369494f4 100644 --- a/Data/Engine/services/assemblies/service.py +++ b/Data/Engine/services/assemblies/service.py @@ -15,11 +15,17 @@ import hashlib import json import logging import uuid -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, List, Mapping, Optional, Union from ...assembly_management.bootstrap import AssemblyCache from ...assembly_management.models import AssemblyDomain, AssemblyRecord, CachedAssembly, PayloadType from ...assembly_management.sync import sync_official_domain +from .serialization import ( + AssemblySerializationError, + MAX_DOCUMENT_BYTES, + prepare_import_request, + record_to_legacy_payload, +) class AssemblyRuntimeService: @@ -58,6 +64,17 @@ class AssemblyRuntimeService: data = self._serialize_entry(entry, include_payload=True, payload_text=payload_text) return data + def export_assembly(self, assembly_guid: str) -> Dict[str, Any]: + entry = self._cache.get_entry(assembly_guid) + if not entry: + raise ValueError(f"Assembly '{assembly_guid}' not found") + payload_text = self._read_payload_text(assembly_guid) + export_payload = record_to_legacy_payload(entry.record, domain=entry.domain, payload_text=payload_text) + export_payload["is_dirty"] = entry.is_dirty + export_payload["dirty_since"] = entry.dirty_since.isoformat() if entry.dirty_since else None + export_payload["last_persisted"] = entry.last_persisted.isoformat() if entry.last_persisted else None + return export_payload + def get_cached_entry(self, assembly_guid: str) -> Optional[CachedAssembly]: return self._cache.get_entry(assembly_guid) @@ -158,6 +175,26 @@ class AssemblyRuntimeService: sync_official_domain(db_manager, payload_manager, staging_root, logger=self._logger) self._cache.reload() + def import_assembly( + self, + *, + domain: AssemblyDomain, + document: Union[str, Mapping[str, Any]], + assembly_guid: Optional[str] = None, + metadata_override: Optional[Mapping[str, Any]] = None, + tags_override: Optional[Mapping[str, Any]] = None, + ) -> Dict[str, Any]: + resolved_guid, payload = prepare_import_request( + document, + domain=domain, + assembly_guid=assembly_guid, + metadata_override=metadata_override, + tags_override=tags_override, + ) + if resolved_guid and self._cache.get_entry(resolved_guid): + return self.update_assembly(resolved_guid, payload) + return self.create_assembly(payload) + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ diff --git a/Data/Engine/tests/assemblies/test_cache.py b/Data/Engine/tests/assemblies/test_cache.py new file mode 100644 index 00000000..ef3bb1d6 --- /dev/null +++ b/Data/Engine/tests/assemblies/test_cache.py @@ -0,0 +1,152 @@ +# ====================================================== +# Data\Engine\tests\assemblies\test_cache.py +# Description: Validates AssemblyCache lifecycle behaviour, including dirty flags and background flushing. +# +# API Endpoints (if applicable): None +# ====================================================== + +from __future__ import annotations + +import base64 +import logging +import threading +import time +from typing import Iterator, Tuple + +import pytest + +from Data.Engine.assembly_management.databases import AssemblyDatabaseManager +from Data.Engine.assembly_management.models import AssemblyDomain +from Data.Engine.assembly_management.payloads import PayloadManager +from Data.Engine.assembly_management.bootstrap import AssemblyCache +from Data.Engine.services.assemblies.service import AssemblyRuntimeService + + +@pytest.fixture() +def assembly_runtime(tmp_path_factory: pytest.TempPathFactory) -> Iterator[Tuple[AssemblyRuntimeService, AssemblyCache, AssemblyDatabaseManager]]: + staging_root = tmp_path_factory.mktemp("assemblies_staging") + runtime_root = tmp_path_factory.mktemp("assemblies_runtime") + + db_manager = AssemblyDatabaseManager(staging_root=staging_root, runtime_root=runtime_root, logger=logging.getLogger("test.assemblies.db")) + db_manager.initialise() + payload_manager = PayloadManager( + staging_root=staging_root / "Payloads", + runtime_root=runtime_root / "Payloads", + logger=logging.getLogger("test.assemblies.payload"), + ) + cache = AssemblyCache( + database_manager=db_manager, + payload_manager=payload_manager, + flush_interval_seconds=5.0, + logger=logging.getLogger("test.assemblies.cache"), + ) + service = AssemblyRuntimeService(cache, logger=logging.getLogger("test.assemblies.runtime")) + try: + yield service, cache, db_manager + finally: + cache.shutdown(flush=True) + + +def _script_payload(display_name: str = "Cache Test Script") -> dict: + body = 'Write-Host "Hello from cache tests"' + encoded = base64.b64encode(body.encode("utf-8")).decode("ascii") + return { + "domain": "user", + "assembly_guid": None, + "assembly_kind": "script", + "display_name": display_name, + "summary": "Cache test fixture payload.", + "category": "script", + "assembly_type": "powershell", + "version": 1, + "metadata": { + "sites": {"mode": "all", "values": []}, + "variables": [], + "files": [], + "timeout_seconds": 120, + "script_encoding": "base64", + }, + "tags": {}, + "payload": { + "version": 1, + "name": display_name, + "description": "Cache test fixture payload.", + "category": "script", + "type": "powershell", + "script": encoded, + "timeout_seconds": 120, + "sites": {"mode": "all", "values": []}, + "variables": [], + "files": [], + "script_encoding": "base64", + }, + } + + +def test_cache_flush_marks_entries_clean(assembly_runtime) -> None: + service, cache, db_manager = assembly_runtime + record = service.create_assembly(_script_payload()) + guid = record["assembly_guid"] + + snapshot = {entry["assembly_guid"]: entry for entry in cache.describe()} + assert snapshot[guid]["is_dirty"] == "true" + + cache.flush_now() + cache.reload() + snapshot = {entry["assembly_guid"]: entry for entry in cache.describe()} + assert snapshot[guid]["is_dirty"] == "false" + persisted = db_manager.load_all(AssemblyDomain.USER) + assert any(item.assembly_guid == guid for item in persisted) + + +def test_cache_worker_flushes_on_event(assembly_runtime) -> None: + service, cache, _db_manager = assembly_runtime + record = service.create_assembly(_script_payload(display_name="Worker Flush Script")) + guid = record["assembly_guid"] + + cache._flush_event.set() # Trigger worker loop without waiting for full interval. + time.sleep(0.2) + cache.reload() + snapshot = {entry["assembly_guid"]: entry for entry in cache.describe()} + assert snapshot[guid]["is_dirty"] == "false" + + +def test_cache_flush_waits_for_locked_database(assembly_runtime) -> None: + service, cache, db_manager = assembly_runtime + if cache._worker.is_alive(): # type: ignore[attr-defined] + cache._stop_event.set() # type: ignore[attr-defined] + cache._flush_event.set() # type: ignore[attr-defined] + cache._worker.join(timeout=1.0) # type: ignore[attr-defined] + cache._stop_event.clear() # type: ignore[attr-defined] + record = service.create_assembly(_script_payload(display_name="Concurrency Script")) + guid = record["assembly_guid"] + cache.flush_now() + + update_payload = _script_payload(display_name="Concurrency Script") + update_payload["assembly_guid"] = guid + update_payload["summary"] = "Updated summary after lock." + service.update_assembly(guid, update_payload) + + conn = db_manager._open_connection(AssemblyDomain.USER) # type: ignore[attr-defined] + cur = conn.cursor() + cur.execute("BEGIN IMMEDIATE") + + flush_thread = threading.Thread(target=cache.flush_now) + flush_thread.start() + + time.sleep(0.2) + snapshot = {entry["assembly_guid"]: entry for entry in cache.describe()} + assert snapshot[guid]["is_dirty"] == "true" + conn.rollback() + conn.close() + + flush_thread.join(timeout=5.0) + assert not flush_thread.is_alive(), "Initial flush attempt did not return after releasing database lock." + cache.flush_now() + cache.reload() + snapshot = {entry["assembly_guid"]: entry for entry in cache.describe()} + assert snapshot[guid]["is_dirty"] == "false" + + records = db_manager.load_all(AssemblyDomain.USER) + summaries = {entry.assembly_guid: entry.summary for entry in records} + assert summaries[guid] == "Updated summary after lock." diff --git a/Data/Engine/tests/assemblies/test_import_export.py b/Data/Engine/tests/assemblies/test_import_export.py new file mode 100644 index 00000000..cd21b5ca --- /dev/null +++ b/Data/Engine/tests/assemblies/test_import_export.py @@ -0,0 +1,99 @@ +# ====================================================== +# Data\Engine\tests\assemblies\test_import_export.py +# Description: Ensures assembly import/export endpoints round-trip legacy JSON documents. +# +# API Endpoints (if applicable): None +# ====================================================== + +from __future__ import annotations + +import base64 + +from flask.testing import FlaskClient + +from Data.Engine.Unit_Tests.conftest import EngineTestHarness + + +def _user_client(harness: EngineTestHarness) -> FlaskClient: + client = harness.app.test_client() + with client.session_transaction() as sess: + sess["username"] = "importer" + sess["role"] = "User" + return client + + +def _script_document(name: str = "Import Script") -> dict: + payload = 'Write-Host "round trip export"' + encoded = base64.b64encode(payload.encode("utf-8")).decode("ascii") + return { + "version": 2, + "name": name, + "description": "Import/export test script.", + "category": "script", + "type": "powershell", + "script": encoded, + "timeout_seconds": 45, + "sites": {"mode": "all", "values": []}, + "variables": [{"name": "example", "label": "Example", "type": "string", "default": ""}], + "files": [], + "script_encoding": "base64", + } + + +def _workflow_document(name: str = "Import Workflow") -> dict: + return { + "tab_name": name, + "description": "Import/export workflow test.", + "nodes": [ + { + "id": "node-1", + "type": "DataNode", + "position": {"x": 10, "y": 20}, + "data": {"label": "Input", "value": "example"}, + } + ], + "edges": [], + } + + +def test_script_import_export_round_trip(engine_harness: EngineTestHarness) -> None: + client = _user_client(engine_harness) + document = _script_document() + + import_response = client.post( + "/api/assemblies/import", + json={"domain": "user", "document": document}, + ) + assert import_response.status_code == 201 + imported = import_response.get_json() + assembly_guid = imported["assembly_guid"] + assert imported["payload_json"]["script"] == document["script"] + + export_response = client.get(f"/api/assemblies/{assembly_guid}/export") + assert export_response.status_code == 200 + exported = export_response.get_json() + assert exported["assembly_guid"] == assembly_guid + assert exported["payload"]["script"] == document["script"] + assert exported["metadata"]["display_name"] == document["name"] + assert exported["payload"]["variables"][0]["name"] == "example" + assert isinstance(exported["queue"], list) + + +def test_workflow_import_export_round_trip(engine_harness: EngineTestHarness) -> None: + client = _user_client(engine_harness) + document = _workflow_document() + + response = client.post( + "/api/assemblies/import", + json={"domain": "user", "document": document}, + ) + assert response.status_code == 201 + payload = response.get_json() + assembly_guid = payload["assembly_guid"] + assert payload["payload_json"]["nodes"][0]["id"] == "node-1" + + export_response = client.get(f"/api/assemblies/{assembly_guid}/export") + assert export_response.status_code == 200 + exported = export_response.get_json() + assert exported["payload"]["nodes"][0]["id"] == "node-1" + assert exported["metadata"]["display_name"] == document["tab_name"] diff --git a/Data/Engine/tests/assemblies/test_payloads.py b/Data/Engine/tests/assemblies/test_payloads.py new file mode 100644 index 00000000..22609be1 --- /dev/null +++ b/Data/Engine/tests/assemblies/test_payloads.py @@ -0,0 +1,37 @@ +# ====================================================== +# Data\Engine\tests\assemblies\test_payloads.py +# Description: Exercises PayloadManager storage, mirroring, and deletion behaviours. +# +# API Endpoints (if applicable): None +# ====================================================== + +from __future__ import annotations + +import base64 +from pathlib import Path + +from Data.Engine.assembly_management.models import PayloadType +from Data.Engine.assembly_management.payloads import PayloadManager + + +def test_payload_manager_store_update_delete(tmp_path: Path) -> None: + staging_root = tmp_path / "staging" + runtime_root = tmp_path / "runtime" + manager = PayloadManager(staging_root=staging_root, runtime_root=runtime_root) + + content = base64.b64encode(b"payload-bytes").decode("ascii") + descriptor = manager.store_payload(PayloadType.SCRIPT, content, assembly_guid="abc123", extension=".json") + + staging_path = staging_root / "abc123" / descriptor.file_name + runtime_path = runtime_root / "abc123" / descriptor.file_name + assert staging_path.is_file() + assert runtime_path.is_file() + assert descriptor.size_bytes == len(content) + + updated = manager.update_payload(descriptor, content + "-v2") + assert updated.size_bytes == len(content + "-v2") + assert staging_path.read_text(encoding="utf-8").endswith("-v2") + + manager.delete_payload(descriptor) + assert not staging_path.exists() + assert not runtime_path.exists() diff --git a/Data/Engine/tests/assemblies/test_permissions.py b/Data/Engine/tests/assemblies/test_permissions.py new file mode 100644 index 00000000..b42477f8 --- /dev/null +++ b/Data/Engine/tests/assemblies/test_permissions.py @@ -0,0 +1,122 @@ +# ====================================================== +# Data\Engine\tests\assemblies\test_permissions.py +# Description: Verifies Assembly API domain guards and Dev Mode permissions. +# +# API Endpoints (if applicable): None +# ====================================================== + +from __future__ import annotations + +import base64 + +from flask.testing import FlaskClient + +from Data.Engine.assembly_management.models import AssemblyDomain + +from Data.Engine.Unit_Tests.conftest import EngineTestHarness + + +def _script_document(name: str = "Permission Script") -> dict: + script = 'Write-Host "permissions"' + encoded = base64.b64encode(script.encode("utf-8")).decode("ascii") + return { + "version": 1, + "name": name, + "description": "Permission test script.", + "category": "script", + "type": "powershell", + "script": encoded, + "timeout_seconds": 60, + "sites": {"mode": "all", "values": []}, + "variables": [], + "files": [], + "script_encoding": "base64", + } + + +def _user_client(harness: EngineTestHarness) -> FlaskClient: + client = harness.app.test_client() + with client.session_transaction() as sess: + sess["username"] = "operator" + sess["role"] = "User" + return client + + +def _admin_client(harness: EngineTestHarness) -> FlaskClient: + client = harness.app.test_client() + with client.session_transaction() as sess: + sess["username"] = "admin" + sess["role"] = "Admin" + return client + + +def test_non_admin_cannot_write_official_domain(engine_harness: EngineTestHarness) -> None: + client = _user_client(engine_harness) + response = client.post( + "/api/assemblies", + json={ + "domain": AssemblyDomain.OFFICIAL.value, + "assembly_kind": "script", + "display_name": "User Attempt", + "summary": "Should fail", + "category": "script", + "assembly_type": "powershell", + "version": 1, + "metadata": {}, + "payload": _script_document("User Attempt"), + }, + ) + assert response.status_code == 403 + payload = response.get_json() + assert payload["error"] == "forbidden" + + +def test_admin_requires_dev_mode_for_official_mutation(engine_harness: EngineTestHarness) -> None: + client = _admin_client(engine_harness) + response = client.post( + "/api/assemblies", + json={ + "domain": AssemblyDomain.OFFICIAL.value, + "assembly_kind": "script", + "display_name": "Dev Mode Required", + "summary": "Should request dev mode", + "category": "script", + "assembly_type": "powershell", + "version": 1, + "metadata": {}, + "payload": _script_document("Dev Mode Required"), + }, + ) + assert response.status_code == 403 + payload = response.get_json() + assert payload["error"] == "dev_mode_required" + + +def test_admin_with_dev_mode_can_mutate_official(engine_harness: EngineTestHarness) -> None: + client = _admin_client(engine_harness) + response = client.post("/api/assemblies/dev-mode/switch", json={"enabled": True}) + assert response.status_code == 200 + assert response.get_json()["dev_mode"] is True + + create_response = client.post( + "/api/assemblies", + json={ + "domain": AssemblyDomain.OFFICIAL.value, + "assembly_kind": "script", + "display_name": "Official Dev Mode Script", + "summary": "Created while Dev Mode enabled", + "category": "script", + "assembly_type": "powershell", + "version": 1, + "metadata": {}, + "payload": _script_document("Official Dev Mode Script"), + }, + ) + assert create_response.status_code == 201 + record = create_response.get_json() + assert record["source"] == AssemblyDomain.OFFICIAL.value + assert record["is_dirty"] is True + + flush_response = client.post("/api/assemblies/dev-mode/write") + assert flush_response.status_code == 200 + assert flush_response.get_json()["status"] == "flushed" diff --git a/Data/Engine/tests/conftest.py b/Data/Engine/tests/conftest.py new file mode 100644 index 00000000..b7d64fd8 --- /dev/null +++ b/Data/Engine/tests/conftest.py @@ -0,0 +1,11 @@ +# ====================================================== +# Data\Engine\tests\conftest.py +# Description: Re-exports shared Engine test fixtures for assembly-specific test suites. +# +# API Endpoints (if applicable): None +# ====================================================== + +from __future__ import annotations + +from Data.Engine.Unit_Tests.conftest import engine_harness # noqa: F401 + diff --git a/Data/Engine/tools/assemblies.py b/Data/Engine/tools/assemblies.py new file mode 100644 index 00000000..1fbfb96a --- /dev/null +++ b/Data/Engine/tools/assemblies.py @@ -0,0 +1,87 @@ +# ====================================================== +# Data\Engine\tools\assemblies.py +# Description: CLI helper for assembly maintenance tasks, including official domain synchronisation. +# +# API Endpoints (if applicable): None +# ====================================================== + +"""Assembly maintenance CLI.""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path +from typing import Optional + +from Data.Engine.assembly_management.databases import AssemblyDatabaseManager +from Data.Engine.assembly_management.models import AssemblyDomain +from Data.Engine.assembly_management.payloads import PayloadManager +from Data.Engine.assembly_management.sync import sync_official_domain + + +logger = logging.getLogger("borealis.assembly.cli") + + +def _default_staging_root() -> Path: + return Path(__file__).resolve().parents[3] / "Data" / "Engine" / "Assemblies" + + +def _default_runtime_root() -> Path: + return Path(__file__).resolve().parents[3] / "Engine" / "Assemblies" + + +def cmd_sync_official(*, staging_root: Optional[Path], runtime_root: Optional[Path]) -> int: + staging = staging_root or _default_staging_root() + runtime = runtime_root or _default_runtime_root() + staging.mkdir(parents=True, exist_ok=True) + runtime.mkdir(parents=True, exist_ok=True) + + logger.info("Starting official assembly sync.") + db_manager = AssemblyDatabaseManager(staging_root=staging, runtime_root=runtime, logger=logger) + db_manager.initialise() + payload_manager = PayloadManager(staging_root=staging / "Payloads", runtime_root=runtime / "Payloads", logger=logger) + + sync_official_domain(db_manager, payload_manager, staging, logger=logger) + records = db_manager.load_all(AssemblyDomain.OFFICIAL) + source_count = sum(1 for path in staging.rglob("*.json") if path.is_file()) + + logger.info( + "Official sync complete: %s records persisted (staging sources=%s).", + len(records), + source_count, + ) + print(f"Official assemblies synced: records={len(records)} staged_json={source_count}") + if len(records) != source_count: + print("warning: record count does not match JSON source file count", file=sys.stderr) + return 1 + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Borealis assembly maintenance CLI.") + subparsers = parser.add_subparsers(dest="command") + + sync_parser = subparsers.add_parser("sync-official", help="Rebuild the official assembly database from staged JSON sources.") + sync_parser.add_argument("--staging-root", type=Path, default=None, help="Override the staging assemblies directory.") + sync_parser.add_argument("--runtime-root", type=Path, default=None, help="Override the runtime assemblies directory.") + + return parser + + +def main(argv: Optional[list[str]] = None) -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s-%(levelname)s: %(message)s") + parser = build_parser() + args = parser.parse_args(argv) + + if args.command == "sync-official": + return cmd_sync_official(staging_root=args.staging_root, runtime_root=args.runtime_root) + + parser.print_help() + return 1 + + +if __name__ == "__main__": # pragma: no cover - CLI entrypoint + raise SystemExit(main()) + diff --git a/Docs/assemblies.md b/Docs/assemblies.md new file mode 100644 index 00000000..03384119 --- /dev/null +++ b/Docs/assemblies.md @@ -0,0 +1,19 @@ +# Assemblies Runtime Reference + +## Database Layout +- Three SQLite databases live under `Data/Engine/Assemblies` (`official.db`, `community.db`, `user_created.db`) and mirror to `Engine/Assemblies` at runtime. +- Payload binaries/json store under `Payloads/` in both staging and runtime directories; the AssemblyCache references payload GUIDs instead of embedding large blobs. +- WAL mode with shared-cache is enabled on every connection; queue flushes copy the refreshed `.db`, `-wal`, and `-shm` files into the runtime mirror. +- `AssemblyCache.describe()` reveals dirty/clean state per assembly, helping operators spot pending writes before shutdown or sync operations. + +## Dev Mode Controls +- User-created domain mutations remain open to authenticated operators; community/official writes require an administrator with Dev Mode enabled. +- Toggle Dev Mode via `POST /api/assemblies/dev-mode/switch` or the Assemblies admin controls; state expires automatically based on the server-side TTL. +- Privileged actions (create/update/delete, cross-domain clone, queue flush, official sync, import into protected domains) emit audit entries under `Engine/Logs/assemblies.log`. +- When Dev Mode is disabled, API responses return `dev_mode_required` to prompt admins to enable overrides before retrying protected mutations. + +## Backup Guidance +- Regularly snapshot `Data/Engine/Assemblies` and `Data/Engine/Assemblies/Payloads` alongside the mirrored runtime copies to preserve both metadata and payload artifacts. +- Include the queue inspection endpoint (`GET /api/assemblies`) in maintenance scripts to verify no dirty entries remain before capturing backups. +- The new CLI helper `python Data/Engine/tools/assemblies.py sync-official` refreshes the official domain from staged JSON and reports row counts, useful after restoring from backup or before releases. +- Future automation will extend to scheduled backups and staged restore helpers; until then, ensure filesystem backups capture both SQLite databases and payload directories atomically.